1use std::{
6 os::fd::RawFd,
7 sync::{Arc, Mutex, RwLock},
8};
9
10use bitflags::bitflags;
11use pipewire_native_spa as spa;
12
13use crate::{
14 context::{Context, WeakContext},
15 debug, default_topic, hasproxy_method_call_unlocked, hasproxy_notify, hasproxy_notify_unlocked,
16 id_map::IdMap,
17 keys, log, new_refcounted,
18 properties::Properties,
19 protocol,
20 proxy::{self, HasProxy, Proxy, ProxyEvents},
21 proxy_notify, proxy_object_invoke, refcounted, some_closure, types, HookId, Id, Refcounted,
22};
23
24default_topic!(log::topic::CORE);
25
26const VERSION: u32 = 4;
27
28const DEFAULT_REMOTE: &str = "pipewire-0";
29
30pub(crate) fn get_remote(props: Option<&spa::dict::Dict>) -> String {
31 std::env::var("PIPEWIRE_REMOTE")
32 .ok()
33 .filter(|v| !v.is_empty())
34 .or_else(|| {
35 props
36 .and_then(|p| p.lookup(keys::REMOTE_NAME).to_owned())
37 .filter(|v| !v.is_empty())
38 .map(|s| s.to_owned())
39 })
40 .unwrap_or(DEFAULT_REMOTE.to_owned())
41}
42
43refcounted! {
44 pub struct Core {
46 context: WeakContext,
47 properties: Properties,
48 client: protocol::client::Client,
49 destroyed: RwLock<bool>,
50 proxy: RwLock<Option<Proxy<Core>>>,
51 objects: RwLock<IdMap<Box<dyn HasProxy>>>,
52 methods: Arc<Mutex<CoreMethods<Core>>>,
53 hooks: Arc<Mutex<spa::hook::HookList<CoreEvents>>>,
54 }
55}
56
57impl Core {
58 pub(crate) fn new(context: &Context, properties: Properties) -> std::io::Result<Self> {
59 debug!("Creating new core");
60
61 let this = Self {
62 inner: new_refcounted(InnerCore::new(context, properties)),
63 };
64
65 let id = this.inner.objects.write().unwrap().reserve();
67 let core_proxy = Proxy::new(0, &this);
68 this.inner
69 .proxy
70 .write()
71 .unwrap()
72 .replace(core_proxy.clone());
73 this.inner
74 .objects
75 .write()
76 .unwrap()
77 .insert_at(id, Box::new(this.clone()));
78
79 let client = proxy::client::Client::new(&this);
80 let client_proxy = client.proxy();
81
82 this.inner.client.set_core(this.downgrade());
83
84 core_proxy.add_listener(ProxyEvents {
85 destroy: some_closure!([this] {
86 debug!("core destroy");
87 let mut destroyed = this.inner.destroyed.write().unwrap();
88
89 if *destroyed {
90 return;
91 }
92
93 *destroyed = true;
94
95 let mut objects = this.inner.objects.write().unwrap();
96 let client = objects.get(1).unwrap();
97
98 hasproxy_notify!(client, destroy);
99 objects.clear();
100
101 this.inner.client.disconnect();
102 }),
103 removed: some_closure!([this] {
104 debug!("core removed");
105 for o in this
106 .inner
107 .objects
108 .read()
109 .unwrap()
110 .iter()
111 .skip(1) .map(|(_id, object)| object)
113 {
114 hasproxy_notify!(o, removed)
115 }
116 }),
117 ..Default::default()
118 });
119
120 this.add_listener(CoreEvents {
121 info: some_closure!([this] info, {
122 if let Some(props) = info.props {
123 debug!("updating props {:?}", props);
124 this.context()
125 .update_properties(props, vec!["default.clock.quantum-limit"]);
126 }
127 }),
128 done: some_closure!([core_proxy] id, seq, {
129 debug!("got done: {id} {seq}");
130 let core = core_proxy.object().unwrap();
131 let proxies = core.inner.objects.read().unwrap();
132
133 if let Some(object) = proxies.get(id) {
134 hasproxy_notify_unlocked!(object, proxies, done, seq);
135 }
136 }),
137 error: some_closure!([core_proxy] id, seq, res, message, {
138 debug!("got error: {id} {seq} {res} {message}");
139 let core = core_proxy.object().unwrap();
140 let proxies = core.inner.objects.read().unwrap();
141
142 if let Some(object) = proxies.get(id) {
143 hasproxy_notify_unlocked!(object, proxies, error, seq, res, message);
144 }
145 }),
146 ping: some_closure!([core_proxy] id, seq, {
147 debug!("got ping: {id} {seq}");
148 let _ = proxy_object_invoke!(core_proxy, pong, id, seq);
149 }),
150 remove_id: some_closure!([core_proxy] id, {
151 debug!("got remove_id: {id}");
152 let core = core_proxy.object().unwrap();
153 let mut proxies = core.inner.objects.write().unwrap();
154
155 if let Some(object) = proxies.get(id) {
156 hasproxy_notify!(object, removed);
157 proxies.remove(id);
158 }
159 }),
160 bound_id: some_closure!([core_proxy] id, global_id, {
161 debug!("got bound_id: {id} {global_id}");
162 let core = core_proxy.object().unwrap();
163 let proxies = core.inner.objects.read().unwrap();
164
165 if let Some(object) = proxies.get(id) {
166 hasproxy_method_call_unlocked!(object, proxies, set_bound_id, global_id);
167 }
168 }),
169 add_mem: some_closure!([] _id, _type_, _fd, _flags, {
170 todo!("core.add_mem is not yet implemented")
171 }),
172 remove_mem: some_closure!([] _id, {
173 todo!("core.remove_mem is not yet implemented")
174 }),
175 bound_props: some_closure!([core_proxy] id, global_id, props, {
176 debug!("got bound_props: {id} {global_id} {props:?}");
177 let core = core_proxy.object().unwrap();
178 let proxies = core.inner.objects.read().unwrap();
179
180 if let Some(object) = proxies.get(id) {
181 hasproxy_method_call_unlocked!(object, proxies, set_bound_props, global_id, props);
182 }
183 }),
184 });
185
186 proxy_object_invoke!(core_proxy, hello, VERSION)?;
187
188 proxy_object_invoke!(client_proxy, update_properties, &this.inner.properties)?;
189
190 this.inner
191 .client
192 .connect(Some(&this.inner.properties.dict()), None)?;
193
194 Ok(this)
195 }
196
197 pub fn disconnect(&self) {
202 proxy_notify!(self, removed);
203 proxy_notify!(self, destroy);
204 }
205
206 pub(crate) fn context(&self) -> Context {
207 self.inner
208 .context
209 .upgrade()
210 .expect("Context should outlive core")
211 }
212
213 pub(crate) fn connection(&self) -> protocol::connection::Connection {
214 self.inner.client.connection()
215 }
216
217 pub(crate) fn new_object(&self, type_: &str) -> std::io::Result<Box<dyn HasProxy>> {
218 let new_object: Box<dyn HasProxy> = match type_ {
219 types::interface::CLIENT => Box::new(proxy::client::Client::new(self)),
220 types::interface::DEVICE => Box::new(proxy::device::Device::new(self)),
221 types::interface::FACTORY => Box::new(proxy::factory::Factory::new(self)),
222 types::interface::LINK => Box::new(proxy::link::Link::new(self)),
223 types::interface::METADATA => Box::new(proxy::metadata::Metadata::new(self)),
224 types::interface::MODULE => Box::new(proxy::module::Module::new(self)),
225 types::interface::NODE => Box::new(proxy::node::Node::new(self)),
226 types::interface::PORT => Box::new(proxy::port::Port::new(self)),
227 types::interface::PROFILER => Box::new(proxy::profiler::Profiler::new(self)),
228 _ => {
229 return Err(std::io::Error::new(
230 std::io::ErrorKind::Unsupported,
231 format!("Unsupported proxy type {type_}"),
232 ))
233 }
234 };
235
236 Ok(new_object)
237 }
238
239 pub(crate) fn next_proxy_id(&self) -> Id {
240 self.inner.objects.write().unwrap().reserve()
241 }
242
243 pub(crate) fn add_proxy<T: HasProxy + Refcounted>(&self, object: &T, id: Id) {
244 self.inner
245 .objects
246 .write()
247 .unwrap()
248 .insert_at(id, Box::new(object.clone()));
249 }
250
251 pub(crate) fn find_proxy_type(&self, id: Id) -> Option<types::ObjectType> {
252 self.inner
253 .objects
254 .read()
255 .unwrap()
256 .get(id)
257 .map(|o| o.type_())
258 }
259
260 pub(crate) fn find_proxy<T: HasProxy + Refcounted>(&self, id: Id) -> Option<Proxy<T>> {
261 self.inner
262 .objects
263 .read()
264 .unwrap()
265 .get(id)
266 .and_then(|o| o.downcast_proxy::<T>())
267 }
268
269 pub fn add_listener(&self, events: CoreEvents) -> HookId {
271 self.inner.hooks.lock().unwrap().append(events)
272 }
273
274 pub fn remove_listener(&self, hook_id: HookId) {
276 self.inner.hooks.lock().unwrap().remove(hook_id);
277 }
278
279 pub fn sync(&self) -> std::io::Result<u32> {
281 let proxy = self.proxy();
282 proxy_object_invoke!(proxy, sync, 0)
283 }
284
285 pub fn registry(&self) -> std::io::Result<proxy::registry::Registry> {
288 let proxy = self.proxy();
289 proxy_object_invoke!(proxy, get_registry)
290 }
291
292 pub fn create_object(
294 &self,
295 factory_name: &str,
296 type_: &str,
297 version: u32,
298 props: &Properties,
299 ) -> std::io::Result<Box<dyn HasProxy>> {
300 let proxy = self.proxy();
301 proxy_object_invoke!(proxy, create_object, factory_name, type_, version, props)
302 }
303
304 pub fn destroy(&self, object: &dyn HasProxy) -> std::io::Result<()> {
306 let proxy = self.proxy();
307 proxy_object_invoke!(proxy, destroy, object)
308 }
309
310 pub(crate) fn methods(&self) -> Arc<Mutex<CoreMethods<Core>>> {
311 self.inner.methods.clone()
312 }
313
314 pub(crate) fn events(&self) -> Arc<Mutex<spa::hook::HookList<CoreEvents>>> {
315 self.inner.hooks.clone()
316 }
317}
318
319impl HasProxy for Core {
320 fn type_(&self) -> types::ObjectType {
321 types::interface::CORE
322 }
323
324 fn version(&self) -> u32 {
325 4
326 }
327
328 fn proxy(&self) -> Proxy<Core> {
329 self.inner
330 .proxy
331 .read()
332 .unwrap()
333 .as_ref()
334 .expect("Proxy should be initialised")
335 .clone()
336 }
337}
338
339bitflags! {
340 #[repr(C)]
342 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
343 pub struct CoreChangeMask : u32 {
344 const PROPS = (1 << 0);
346 }
347}
348
349pub struct CoreInfo<'a> {
351 pub id: u32,
353 pub cookie: u32,
355 pub user_name: &'a str,
357 pub host_name: &'a str,
359 pub version: &'a str,
361 pub name: &'a str,
363 pub mask: CoreChangeMask,
365 pub props: Option<&'a Properties>,
367}
368
369#[allow(clippy::type_complexity)]
370pub(crate) struct CoreMethods<T: HasProxy + Refcounted> {
371 pub(crate) hello: Box<dyn FnMut(&Proxy<T>, u32) -> std::io::Result<()>>,
372 pub(crate) sync: Box<dyn FnMut(&Proxy<T>, Id) -> std::io::Result<u32>>,
373 pub(crate) pong: Box<dyn FnMut(&Proxy<T>, Id, u32) -> std::io::Result<()>>,
374 #[allow(unused)]
375 pub(crate) error: Box<dyn FnMut(&Proxy<T>, u32, u32, &str) -> std::io::Result<()>>,
376 pub(crate) get_registry:
377 Box<dyn FnMut(&Proxy<T>) -> std::io::Result<proxy::registry::Registry>>,
378 pub(crate) create_object: Box<
379 dyn FnMut(&Proxy<T>, &str, &str, u32, &Properties) -> std::io::Result<Box<dyn HasProxy>>,
380 >,
381 pub(crate) destroy: Box<dyn FnMut(&Proxy<T>, &dyn HasProxy) -> std::io::Result<()>>,
382}
383
384#[allow(clippy::type_complexity)]
386#[derive(Default)]
387pub struct CoreEvents {
388 pub info: Option<Box<dyn FnMut(&CoreInfo<'_>) + Send>>,
390 pub done: Option<Box<dyn FnMut(Id, u32) + Send>>,
392 pub error: Option<Box<dyn FnMut(Id, u32, u32, &str) + Send>>,
394 pub(crate) ping: Option<Box<dyn FnMut(Id, u32) + Send>>,
395 pub(crate) remove_id: Option<Box<dyn FnMut(Id) + Send>>,
396 pub(crate) bound_id: Option<Box<dyn FnMut(Id, Id) + Send>>,
397 #[allow(unused)]
398 pub(crate) add_mem: Option<Box<dyn FnMut(Id, u32, RawFd, u32) + Send>>,
399 #[allow(unused)]
400 pub(crate) remove_mem: Option<Box<dyn FnMut(Id) + Send>>,
401 pub(crate) bound_props: Option<Box<dyn FnMut(Id, Id, &Properties) + Send>>,
402}
403
404impl InnerCore {
405 fn new(context: &Context, mut properties: Properties) -> Self {
406 properties.add_dict(&context.properties_dict());
407
408 let client = context.protocol().new_client(None);
411 let connection = client.connection();
412
413 Self {
414 context: context.downgrade(),
415 properties,
416 client,
417 destroyed: RwLock::new(false),
418 proxy: RwLock::new(None),
419 objects: RwLock::new(IdMap::new()),
420 methods: Arc::new(Mutex::new(protocol::marshal::core::Methods::marshal(
421 connection,
422 ))),
423 hooks: spa::hook::HookList::new(),
424 }
425 }
426}