pipewire_native/
core.rs

1// SPDX-License-Identifier: MIT
2// SPDX-FileCopyrightText: Copyright (c) 2025 Asymptotic Inc.
3// SPDX-FileCopyrightText: Copyright (c) 2025 Arun Raghavan
4
5use std::{
6    os::fd::RawFd,
7    sync::{Arc, Mutex, RwLock},
8};
9
10use bitflags::bitflags;
11use pipewire_native_spa as spa;
12
13use crate::{
14    context::{Context, WeakContext},
15    debug, default_topic, hasproxy_method_call_unlocked, hasproxy_notify, hasproxy_notify_unlocked,
16    id_map::IdMap,
17    keys, log, new_refcounted,
18    properties::Properties,
19    protocol,
20    proxy::{self, HasProxy, Proxy, ProxyEvents},
21    proxy_notify, proxy_object_invoke, refcounted, some_closure, types, HookId, Id, Refcounted,
22};
23
24default_topic!(log::topic::CORE);
25
26const VERSION: u32 = 4;
27
28const DEFAULT_REMOTE: &str = "pipewire-0";
29
30pub(crate) fn get_remote(props: Option<&spa::dict::Dict>) -> String {
31    std::env::var("PIPEWIRE_REMOTE")
32        .ok()
33        .filter(|v| !v.is_empty())
34        .or_else(|| {
35            props
36                .and_then(|p| p.lookup(keys::REMOTE_NAME).to_owned())
37                .filter(|v| !v.is_empty())
38                .map(|s| s.to_owned())
39        })
40        .unwrap_or(DEFAULT_REMOTE.to_owned())
41}
42
43refcounted! {
44    /// A singleton object representing the connection between the client and the PipeWire server.
45    pub struct Core {
46        context: WeakContext,
47        properties: Properties,
48        client: protocol::client::Client,
49        destroyed: RwLock<bool>,
50        proxy: RwLock<Option<Proxy<Core>>>,
51        objects: RwLock<IdMap<Box<dyn HasProxy>>>,
52        methods: Arc<Mutex<CoreMethods<Core>>>,
53        hooks: Arc<Mutex<spa::hook::HookList<CoreEvents>>>,
54    }
55}
56
57impl Core {
58    pub(crate) fn new(context: &Context, properties: Properties) -> std::io::Result<Self> {
59        debug!("Creating new core");
60
61        let this = Self {
62            inner: new_refcounted(InnerCore::new(context, properties)),
63        };
64
65        // Reserve id 0 because we are id 0
66        let id = this.inner.objects.write().unwrap().reserve();
67        let core_proxy = Proxy::new(0, &this);
68        this.inner
69            .proxy
70            .write()
71            .unwrap()
72            .replace(core_proxy.clone());
73        this.inner
74            .objects
75            .write()
76            .unwrap()
77            .insert_at(id, Box::new(this.clone()));
78
79        let client = proxy::client::Client::new(&this);
80        let client_proxy = client.proxy();
81
82        this.inner.client.set_core(this.downgrade());
83
84        core_proxy.add_listener(ProxyEvents {
85            destroy: some_closure!([this] {
86                debug!("core destroy");
87                let mut destroyed = this.inner.destroyed.write().unwrap();
88
89                if *destroyed {
90                    return;
91                }
92
93                *destroyed = true;
94
95                let mut objects = this.inner.objects.write().unwrap();
96                let client = objects.get(1).unwrap();
97
98                hasproxy_notify!(client, destroy);
99                objects.clear();
100
101                this.inner.client.disconnect();
102            }),
103            removed: some_closure!([this] {
104                debug!("core removed");
105                for o in this
106                    .inner
107                    .objects
108                    .read()
109                    .unwrap()
110                    .iter()
111                    .skip(1) // first object is core, so skip it
112                    .map(|(_id, object)| object)
113                {
114                    hasproxy_notify!(o, removed)
115                }
116            }),
117            ..Default::default()
118        });
119
120        this.add_listener(CoreEvents {
121            info: some_closure!([this] info, {
122                if let Some(props) = info.props {
123                    debug!("updating props {:?}", props);
124                    this.context()
125                        .update_properties(props, vec!["default.clock.quantum-limit"]);
126                }
127            }),
128            done: some_closure!([core_proxy] id, seq, {
129                debug!("got done: {id} {seq}");
130                let core = core_proxy.object().unwrap();
131                let proxies = core.inner.objects.read().unwrap();
132
133                if let Some(object) = proxies.get(id) {
134                    hasproxy_notify_unlocked!(object, proxies, done, seq);
135                }
136            }),
137            error: some_closure!([core_proxy] id, seq, res, message, {
138                debug!("got error: {id} {seq} {res} {message}");
139                let core = core_proxy.object().unwrap();
140                let proxies = core.inner.objects.read().unwrap();
141
142                if let Some(object) = proxies.get(id) {
143                    hasproxy_notify_unlocked!(object, proxies, error, seq, res, message);
144                }
145            }),
146            ping: some_closure!([core_proxy] id, seq, {
147                debug!("got ping: {id} {seq}");
148                let _ = proxy_object_invoke!(core_proxy, pong, id, seq);
149            }),
150            remove_id: some_closure!([core_proxy] id, {
151                debug!("got remove_id: {id}");
152                let core = core_proxy.object().unwrap();
153                let mut proxies = core.inner.objects.write().unwrap();
154
155                if let Some(object) = proxies.get(id) {
156                    hasproxy_notify!(object, removed);
157                    proxies.remove(id);
158                }
159            }),
160            bound_id: some_closure!([core_proxy] id, global_id, {
161                debug!("got bound_id: {id} {global_id}");
162                let core = core_proxy.object().unwrap();
163                let proxies = core.inner.objects.read().unwrap();
164
165                if let Some(object) = proxies.get(id) {
166                    hasproxy_method_call_unlocked!(object, proxies, set_bound_id, global_id);
167                }
168            }),
169            add_mem: some_closure!([] _id, _type_, _fd, _flags, {
170                todo!("core.add_mem is not yet implemented")
171            }),
172            remove_mem: some_closure!([] _id, {
173                todo!("core.remove_mem is not yet implemented")
174            }),
175            bound_props: some_closure!([core_proxy] id, global_id, props, {
176                debug!("got bound_props: {id} {global_id} {props:?}");
177                let core = core_proxy.object().unwrap();
178                let proxies = core.inner.objects.read().unwrap();
179
180                if let Some(object) = proxies.get(id) {
181                    hasproxy_method_call_unlocked!(object, proxies, set_bound_props, global_id, props);
182                }
183            }),
184        });
185
186        proxy_object_invoke!(core_proxy, hello, VERSION)?;
187
188        proxy_object_invoke!(client_proxy, update_properties, &this.inner.properties)?;
189
190        this.inner
191            .client
192            .connect(Some(&this.inner.properties.dict()), None)?;
193
194        Ok(this)
195    }
196
197    /// Disconnect connection with the PipeWire server. This will immediately trigger the `removed`
198    /// and `destroy` events on all tracked proxies. Callers should ensure that this will not
199    /// result in deadlocks with their own synchronisation primitives (for example, taking a lock
200    /// before disconnecting that is also taken in either of those callbacks).
201    pub fn disconnect(&self) {
202        proxy_notify!(self, removed);
203        proxy_notify!(self, destroy);
204    }
205
206    pub(crate) fn context(&self) -> Context {
207        self.inner
208            .context
209            .upgrade()
210            .expect("Context should outlive core")
211    }
212
213    pub(crate) fn connection(&self) -> protocol::connection::Connection {
214        self.inner.client.connection()
215    }
216
217    pub(crate) fn new_object(&self, type_: &str) -> std::io::Result<Box<dyn HasProxy>> {
218        let new_object: Box<dyn HasProxy> = match type_ {
219            types::interface::CLIENT => Box::new(proxy::client::Client::new(self)),
220            types::interface::DEVICE => Box::new(proxy::device::Device::new(self)),
221            types::interface::FACTORY => Box::new(proxy::factory::Factory::new(self)),
222            types::interface::LINK => Box::new(proxy::link::Link::new(self)),
223            types::interface::METADATA => Box::new(proxy::metadata::Metadata::new(self)),
224            types::interface::MODULE => Box::new(proxy::module::Module::new(self)),
225            types::interface::NODE => Box::new(proxy::node::Node::new(self)),
226            types::interface::PORT => Box::new(proxy::port::Port::new(self)),
227            _ => {
228                return Err(std::io::Error::new(
229                    std::io::ErrorKind::Unsupported,
230                    format!("Unsupported proxy type {type_}"),
231                ))
232            }
233        };
234
235        Ok(new_object)
236    }
237
238    pub(crate) fn next_proxy_id(&self) -> Id {
239        self.inner.objects.write().unwrap().reserve()
240    }
241
242    pub(crate) fn add_proxy<T: HasProxy + Refcounted>(&self, object: &T, id: Id) {
243        self.inner
244            .objects
245            .write()
246            .unwrap()
247            .insert_at(id, Box::new(object.clone()));
248    }
249
250    pub(crate) fn find_proxy_type(&self, id: Id) -> Option<types::ObjectType> {
251        self.inner
252            .objects
253            .read()
254            .unwrap()
255            .get(id)
256            .map(|o| o.type_())
257    }
258
259    pub(crate) fn find_proxy<T: HasProxy + Refcounted>(&self, id: Id) -> Option<Proxy<T>> {
260        self.inner
261            .objects
262            .read()
263            .unwrap()
264            .get(id)
265            .and_then(|o| o.downcast_proxy::<T>())
266    }
267
268    /// Listen for events on the core object.
269    pub fn add_listener(&self, events: CoreEvents) -> HookId {
270        self.inner.hooks.lock().unwrap().append(events)
271    }
272
273    /// Remove a set of event listeners.
274    pub fn remove_listener(&self, hook_id: HookId) {
275        self.inner.hooks.lock().unwrap().remove(hook_id);
276    }
277
278    /// Trigger a `sync` message to the server, flushing all pending messages.
279    pub fn sync(&self) -> std::io::Result<u32> {
280        let proxy = self.proxy();
281        proxy_object_invoke!(proxy, sync, 0)
282    }
283
284    /// Retrieve a [Registry](proxy::registry::Registry). This can be used to query and track
285    /// objects exposed by the server.
286    pub fn registry(&self) -> std::io::Result<proxy::registry::Registry> {
287        let proxy = self.proxy();
288        proxy_object_invoke!(proxy, get_registry)
289    }
290
291    /// Create an object of the given factory type on the server.
292    pub fn create_object(
293        &self,
294        factory_name: &str,
295        type_: &str,
296        version: u32,
297        props: &Properties,
298    ) -> std::io::Result<Box<dyn HasProxy>> {
299        let proxy = self.proxy();
300        proxy_object_invoke!(proxy, create_object, factory_name, type_, version, props)
301    }
302
303    /// Destroy a proxy.
304    pub fn destroy(&self, object: &dyn HasProxy) -> std::io::Result<()> {
305        let proxy = self.proxy();
306        proxy_object_invoke!(proxy, destroy, object)
307    }
308
309    pub(crate) fn methods(&self) -> Arc<Mutex<CoreMethods<Core>>> {
310        self.inner.methods.clone()
311    }
312
313    pub(crate) fn events(&self) -> Arc<Mutex<spa::hook::HookList<CoreEvents>>> {
314        self.inner.hooks.clone()
315    }
316}
317
318impl HasProxy for Core {
319    fn type_(&self) -> types::ObjectType {
320        types::interface::CORE
321    }
322
323    fn version(&self) -> u32 {
324        4
325    }
326
327    fn proxy(&self) -> Proxy<Core> {
328        self.inner
329            .proxy
330            .read()
331            .unwrap()
332            .as_ref()
333            .expect("Proxy should be initialised")
334            .clone()
335    }
336}
337
338bitflags! {
339    /// Indicates what changes are being signalled in a [CoreEvents::info] event.
340    #[repr(C)]
341    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
342    pub struct CoreChangeMask : u32 {
343        /// Properties changed.
344        const PROPS = (1 << 0);
345    }
346}
347
348/// Provides [Core]-related information in the [CoreEvents::info] event.
349pub struct CoreInfo<'a> {
350    /// Id of the core.
351    pub id: u32,
352    /// Random cookie to identify this instance.
353    pub cookie: u32,
354    /// User name of the user who started the core.
355    pub user_name: &'a str,
356    /// Host name on which the core is running.
357    pub host_name: &'a str,
358    /// Interface version of the core.
359    pub version: &'a str,
360    /// Name of the core.
361    pub name: &'a str,
362    /// Set of changes since the last call.
363    pub mask: CoreChangeMask,
364    /// Properties of the core.
365    pub props: Option<&'a Properties>,
366}
367
368#[allow(clippy::type_complexity)]
369pub(crate) struct CoreMethods<T: HasProxy + Refcounted> {
370    pub(crate) hello: Box<dyn FnMut(&Proxy<T>, u32) -> std::io::Result<()>>,
371    pub(crate) sync: Box<dyn FnMut(&Proxy<T>, Id) -> std::io::Result<u32>>,
372    pub(crate) pong: Box<dyn FnMut(&Proxy<T>, Id, u32) -> std::io::Result<()>>,
373    #[allow(unused)]
374    pub(crate) error: Box<dyn FnMut(&Proxy<T>, u32, u32, &str) -> std::io::Result<()>>,
375    pub(crate) get_registry:
376        Box<dyn FnMut(&Proxy<T>) -> std::io::Result<proxy::registry::Registry>>,
377    pub(crate) create_object: Box<
378        dyn FnMut(&Proxy<T>, &str, &str, u32, &Properties) -> std::io::Result<Box<dyn HasProxy>>,
379    >,
380    pub(crate) destroy: Box<dyn FnMut(&Proxy<T>, &dyn HasProxy) -> std::io::Result<()>>,
381}
382
383/// Events that may be emitted by a [Core] proxy object.
384#[allow(clippy::type_complexity)]
385#[derive(Default)]
386pub struct CoreEvents {
387    /// Information about the core changed.
388    pub info: Option<Box<dyn FnMut(&CoreInfo<'_>) + Send>>,
389    /// A core operation was completed.
390    pub done: Option<Box<dyn FnMut(Id, u32) + Send>>,
391    /// An error occurred on the core.
392    pub error: Option<Box<dyn FnMut(Id, u32, u32, &str) + Send>>,
393    pub(crate) ping: Option<Box<dyn FnMut(Id, u32) + Send>>,
394    pub(crate) remove_id: Option<Box<dyn FnMut(Id) + Send>>,
395    pub(crate) bound_id: Option<Box<dyn FnMut(Id, Id) + Send>>,
396    #[allow(unused)]
397    pub(crate) add_mem: Option<Box<dyn FnMut(Id, u32, RawFd, u32) + Send>>,
398    #[allow(unused)]
399    pub(crate) remove_mem: Option<Box<dyn FnMut(Id) + Send>>,
400    pub(crate) bound_props: Option<Box<dyn FnMut(Id, Id, &Properties) + Send>>,
401}
402
403impl InnerCore {
404    fn new(context: &Context, mut properties: Properties) -> Self {
405        properties.add_dict(&context.properties_dict());
406
407        // TODO: Create mempool
408
409        let client = context.protocol().new_client(None);
410        let connection = client.connection();
411
412        Self {
413            context: context.downgrade(),
414            properties,
415            client,
416            destroyed: RwLock::new(false),
417            proxy: RwLock::new(None),
418            objects: RwLock::new(IdMap::new()),
419            methods: Arc::new(Mutex::new(protocol::marshal::core::Methods::marshal(
420                connection,
421            ))),
422            hooks: spa::hook::HookList::new(),
423        }
424    }
425}