1use std::{
6 os::fd::RawFd,
7 sync::{Arc, Mutex, RwLock},
8};
9
10use bitflags::bitflags;
11use pipewire_native_spa as spa;
12
13use crate::{
14 context::{Context, WeakContext},
15 debug, default_topic, hasproxy_method_call_unlocked, hasproxy_notify, hasproxy_notify_unlocked,
16 id_map::IdMap,
17 keys, log, new_refcounted,
18 properties::Properties,
19 protocol,
20 proxy::{self, HasProxy, Proxy, ProxyEvents},
21 proxy_notify, proxy_object_invoke, refcounted, some_closure, types, HookId, Id, Refcounted,
22};
23
24default_topic!(log::topic::CORE);
25
26const VERSION: u32 = 4;
27
28const DEFAULT_REMOTE: &str = "pipewire-0";
29
30pub(crate) fn get_remote(props: Option<&spa::dict::Dict>) -> String {
31 std::env::var("PIPEWIRE_REMOTE")
32 .ok()
33 .filter(|v| !v.is_empty())
34 .or_else(|| {
35 props
36 .and_then(|p| p.lookup(keys::REMOTE_NAME).to_owned())
37 .filter(|v| !v.is_empty())
38 .map(|s| s.to_owned())
39 })
40 .unwrap_or(DEFAULT_REMOTE.to_owned())
41}
42
43refcounted! {
44 pub struct Core {
46 context: WeakContext,
47 properties: Properties,
48 client: protocol::client::Client,
49 destroyed: RwLock<bool>,
50 proxy: RwLock<Option<Proxy<Core>>>,
51 objects: RwLock<IdMap<Box<dyn HasProxy>>>,
52 methods: Arc<Mutex<CoreMethods<Core>>>,
53 hooks: Arc<Mutex<spa::hook::HookList<CoreEvents>>>,
54 }
55}
56
57impl Core {
58 pub(crate) fn new(context: &Context, properties: Properties) -> std::io::Result<Self> {
59 debug!("Creating new core");
60
61 let this = Self {
62 inner: new_refcounted(InnerCore::new(context, properties)),
63 };
64
65 let id = this.inner.objects.write().unwrap().reserve();
67 let core_proxy = Proxy::new(0, &this);
68 this.inner
69 .proxy
70 .write()
71 .unwrap()
72 .replace(core_proxy.clone());
73 this.inner
74 .objects
75 .write()
76 .unwrap()
77 .insert_at(id, Box::new(this.clone()));
78
79 let client = proxy::client::Client::new(&this);
80 let client_proxy = client.proxy();
81
82 this.inner.client.set_core(this.downgrade());
83
84 core_proxy.add_listener(ProxyEvents {
85 destroy: some_closure!([this] {
86 debug!("core destroy");
87 let mut destroyed = this.inner.destroyed.write().unwrap();
88
89 if *destroyed {
90 return;
91 }
92
93 *destroyed = true;
94
95 let mut objects = this.inner.objects.write().unwrap();
96 let client = objects.get(1).unwrap();
97
98 hasproxy_notify!(client, destroy);
99 objects.clear();
100
101 this.inner.client.disconnect();
102 }),
103 removed: some_closure!([this] {
104 debug!("core removed");
105 for o in this
106 .inner
107 .objects
108 .read()
109 .unwrap()
110 .iter()
111 .skip(1) .map(|(_id, object)| object)
113 {
114 hasproxy_notify!(o, removed)
115 }
116 }),
117 ..Default::default()
118 });
119
120 this.add_listener(CoreEvents {
121 info: some_closure!([this] info, {
122 if let Some(props) = info.props {
123 debug!("updating props {:?}", props);
124 this.context()
125 .update_properties(props, vec!["default.clock.quantum-limit"]);
126 }
127 }),
128 done: some_closure!([core_proxy] id, seq, {
129 debug!("got done: {id} {seq}");
130 let core = core_proxy.object().unwrap();
131 let proxies = core.inner.objects.read().unwrap();
132
133 if let Some(object) = proxies.get(id) {
134 hasproxy_notify_unlocked!(object, proxies, done, seq);
135 }
136 }),
137 error: some_closure!([core_proxy] id, seq, res, message, {
138 debug!("got error: {id} {seq} {res} {message}");
139 let core = core_proxy.object().unwrap();
140 let proxies = core.inner.objects.read().unwrap();
141
142 if let Some(object) = proxies.get(id) {
143 hasproxy_notify_unlocked!(object, proxies, error, seq, res, message);
144 }
145 }),
146 ping: some_closure!([core_proxy] id, seq, {
147 debug!("got ping: {id} {seq}");
148 let _ = proxy_object_invoke!(core_proxy, pong, id, seq);
149 }),
150 remove_id: some_closure!([core_proxy] id, {
151 debug!("got remove_id: {id}");
152 let core = core_proxy.object().unwrap();
153 let mut proxies = core.inner.objects.write().unwrap();
154
155 if let Some(object) = proxies.get(id) {
156 hasproxy_notify!(object, removed);
157 proxies.remove(id);
158 }
159 }),
160 bound_id: some_closure!([core_proxy] id, global_id, {
161 debug!("got bound_id: {id} {global_id}");
162 let core = core_proxy.object().unwrap();
163 let proxies = core.inner.objects.read().unwrap();
164
165 if let Some(object) = proxies.get(id) {
166 hasproxy_method_call_unlocked!(object, proxies, set_bound_id, global_id);
167 }
168 }),
169 add_mem: some_closure!([] _id, _type_, _fd, _flags, {
170 todo!("core.add_mem is not yet implemented")
171 }),
172 remove_mem: some_closure!([] _id, {
173 todo!("core.remove_mem is not yet implemented")
174 }),
175 bound_props: some_closure!([core_proxy] id, global_id, props, {
176 debug!("got bound_props: {id} {global_id} {props:?}");
177 let core = core_proxy.object().unwrap();
178 let proxies = core.inner.objects.read().unwrap();
179
180 if let Some(object) = proxies.get(id) {
181 hasproxy_method_call_unlocked!(object, proxies, set_bound_props, global_id, props);
182 }
183 }),
184 });
185
186 proxy_object_invoke!(core_proxy, hello, VERSION)?;
187
188 proxy_object_invoke!(client_proxy, update_properties, &this.inner.properties)?;
189
190 this.inner
191 .client
192 .connect(Some(&this.inner.properties.dict()), None)?;
193
194 Ok(this)
195 }
196
197 pub fn disconnect(&self) {
202 proxy_notify!(self, removed);
203 proxy_notify!(self, destroy);
204 }
205
206 pub(crate) fn context(&self) -> Context {
207 self.inner
208 .context
209 .upgrade()
210 .expect("Context should outlive core")
211 }
212
213 pub(crate) fn connection(&self) -> protocol::connection::Connection {
214 self.inner.client.connection()
215 }
216
217 pub(crate) fn new_object(&self, type_: &str) -> std::io::Result<Box<dyn HasProxy>> {
218 let new_object: Box<dyn HasProxy> = match type_ {
219 types::interface::CLIENT => Box::new(proxy::client::Client::new(self)),
220 types::interface::DEVICE => Box::new(proxy::device::Device::new(self)),
221 types::interface::FACTORY => Box::new(proxy::factory::Factory::new(self)),
222 types::interface::LINK => Box::new(proxy::link::Link::new(self)),
223 types::interface::METADATA => Box::new(proxy::metadata::Metadata::new(self)),
224 types::interface::MODULE => Box::new(proxy::module::Module::new(self)),
225 types::interface::NODE => Box::new(proxy::node::Node::new(self)),
226 types::interface::PORT => Box::new(proxy::port::Port::new(self)),
227 _ => {
228 return Err(std::io::Error::new(
229 std::io::ErrorKind::Unsupported,
230 format!("Unsupported proxy type {type_}"),
231 ))
232 }
233 };
234
235 Ok(new_object)
236 }
237
238 pub(crate) fn next_proxy_id(&self) -> Id {
239 self.inner.objects.write().unwrap().reserve()
240 }
241
242 pub(crate) fn add_proxy<T: HasProxy + Refcounted>(&self, object: &T, id: Id) {
243 self.inner
244 .objects
245 .write()
246 .unwrap()
247 .insert_at(id, Box::new(object.clone()));
248 }
249
250 pub(crate) fn find_proxy_type(&self, id: Id) -> Option<types::ObjectType> {
251 self.inner
252 .objects
253 .read()
254 .unwrap()
255 .get(id)
256 .map(|o| o.type_())
257 }
258
259 pub(crate) fn find_proxy<T: HasProxy + Refcounted>(&self, id: Id) -> Option<Proxy<T>> {
260 self.inner
261 .objects
262 .read()
263 .unwrap()
264 .get(id)
265 .and_then(|o| o.downcast_proxy::<T>())
266 }
267
268 pub fn add_listener(&self, events: CoreEvents) -> HookId {
270 self.inner.hooks.lock().unwrap().append(events)
271 }
272
273 pub fn remove_listener(&self, hook_id: HookId) {
275 self.inner.hooks.lock().unwrap().remove(hook_id);
276 }
277
278 pub fn sync(&self) -> std::io::Result<u32> {
280 let proxy = self.proxy();
281 proxy_object_invoke!(proxy, sync, 0)
282 }
283
284 pub fn registry(&self) -> std::io::Result<proxy::registry::Registry> {
287 let proxy = self.proxy();
288 proxy_object_invoke!(proxy, get_registry)
289 }
290
291 pub fn create_object(
293 &self,
294 factory_name: &str,
295 type_: &str,
296 version: u32,
297 props: &Properties,
298 ) -> std::io::Result<Box<dyn HasProxy>> {
299 let proxy = self.proxy();
300 proxy_object_invoke!(proxy, create_object, factory_name, type_, version, props)
301 }
302
303 pub fn destroy(&self, object: &dyn HasProxy) -> std::io::Result<()> {
305 let proxy = self.proxy();
306 proxy_object_invoke!(proxy, destroy, object)
307 }
308
309 pub(crate) fn methods(&self) -> Arc<Mutex<CoreMethods<Core>>> {
310 self.inner.methods.clone()
311 }
312
313 pub(crate) fn events(&self) -> Arc<Mutex<spa::hook::HookList<CoreEvents>>> {
314 self.inner.hooks.clone()
315 }
316}
317
318impl HasProxy for Core {
319 fn type_(&self) -> types::ObjectType {
320 types::interface::CORE
321 }
322
323 fn version(&self) -> u32 {
324 4
325 }
326
327 fn proxy(&self) -> Proxy<Core> {
328 self.inner
329 .proxy
330 .read()
331 .unwrap()
332 .as_ref()
333 .expect("Proxy should be initialised")
334 .clone()
335 }
336}
337
338bitflags! {
339 #[repr(C)]
341 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
342 pub struct CoreChangeMask : u32 {
343 const PROPS = (1 << 0);
345 }
346}
347
348pub struct CoreInfo<'a> {
350 pub id: u32,
352 pub cookie: u32,
354 pub user_name: &'a str,
356 pub host_name: &'a str,
358 pub version: &'a str,
360 pub name: &'a str,
362 pub mask: CoreChangeMask,
364 pub props: Option<&'a Properties>,
366}
367
368#[allow(clippy::type_complexity)]
369pub(crate) struct CoreMethods<T: HasProxy + Refcounted> {
370 pub(crate) hello: Box<dyn FnMut(&Proxy<T>, u32) -> std::io::Result<()>>,
371 pub(crate) sync: Box<dyn FnMut(&Proxy<T>, Id) -> std::io::Result<u32>>,
372 pub(crate) pong: Box<dyn FnMut(&Proxy<T>, Id, u32) -> std::io::Result<()>>,
373 #[allow(unused)]
374 pub(crate) error: Box<dyn FnMut(&Proxy<T>, u32, u32, &str) -> std::io::Result<()>>,
375 pub(crate) get_registry:
376 Box<dyn FnMut(&Proxy<T>) -> std::io::Result<proxy::registry::Registry>>,
377 pub(crate) create_object: Box<
378 dyn FnMut(&Proxy<T>, &str, &str, u32, &Properties) -> std::io::Result<Box<dyn HasProxy>>,
379 >,
380 pub(crate) destroy: Box<dyn FnMut(&Proxy<T>, &dyn HasProxy) -> std::io::Result<()>>,
381}
382
383#[allow(clippy::type_complexity)]
385#[derive(Default)]
386pub struct CoreEvents {
387 pub info: Option<Box<dyn FnMut(&CoreInfo<'_>) + Send>>,
389 pub done: Option<Box<dyn FnMut(Id, u32) + Send>>,
391 pub error: Option<Box<dyn FnMut(Id, u32, u32, &str) + Send>>,
393 pub(crate) ping: Option<Box<dyn FnMut(Id, u32) + Send>>,
394 pub(crate) remove_id: Option<Box<dyn FnMut(Id) + Send>>,
395 pub(crate) bound_id: Option<Box<dyn FnMut(Id, Id) + Send>>,
396 #[allow(unused)]
397 pub(crate) add_mem: Option<Box<dyn FnMut(Id, u32, RawFd, u32) + Send>>,
398 #[allow(unused)]
399 pub(crate) remove_mem: Option<Box<dyn FnMut(Id) + Send>>,
400 pub(crate) bound_props: Option<Box<dyn FnMut(Id, Id, &Properties) + Send>>,
401}
402
403impl InnerCore {
404 fn new(context: &Context, mut properties: Properties) -> Self {
405 properties.add_dict(&context.properties_dict());
406
407 let client = context.protocol().new_client(None);
410 let connection = client.connection();
411
412 Self {
413 context: context.downgrade(),
414 properties,
415 client,
416 destroyed: RwLock::new(false),
417 proxy: RwLock::new(None),
418 objects: RwLock::new(IdMap::new()),
419 methods: Arc::new(Mutex::new(protocol::marshal::core::Methods::marshal(
420 connection,
421 ))),
422 hooks: spa::hook::HookList::new(),
423 }
424 }
425}