pipewire_native/proxy/
node.rs

1// SPDX-License-Identifier: MIT
2// SPDX-FileCopyrightText: Copyright (c) 2025 Asymptotic Inc.
3// SPDX-FileCopyrightText: Copyright (c) 2025 Arun Raghavan
4
5use std::sync::{Arc, Mutex, RwLock};
6
7use bitflags::bitflags;
8use pipewire_native_macros as macros;
9use pipewire_native_spa as spa;
10
11use crate::{
12    core::Core,
13    new_refcounted,
14    properties::Properties,
15    protocol,
16    proxy::{HasProxy, Proxy},
17    proxy_object_invoke, refcounted,
18    types::{self, params::ParamBuilder},
19    HookId, Id, Refcounted,
20};
21
22refcounted! {
23    /// Proxy that represents a node that is connected to the server.
24    pub struct Node {
25        proxy: RwLock<Option<Proxy<Node>>>,
26        methods: Arc<Mutex<NodeMethods<Node>>>,
27        hooks: Arc<Mutex<spa::hook::HookList<NodeEvents>>>,
28    }
29}
30
31#[allow(clippy::type_complexity)]
32pub(crate) struct NodeMethods<T: HasProxy + Refcounted> {
33    pub(crate) subscribe_params:
34        Box<dyn FnMut(&Proxy<T>, &[spa::param::ParamType]) -> std::io::Result<()>>,
35    pub(crate) enum_params: Box<
36        dyn FnMut(
37            &Proxy<T>,
38            u32,
39            Option<spa::param::ParamType>,
40            u32,
41            u32,
42            Option<ParamBuilder>,
43        ) -> std::io::Result<()>,
44    >,
45    pub(crate) set_param: Box<
46        dyn FnMut(
47            &Proxy<T>,
48            spa::param::ParamType,
49            spa::pod::types::ObjectType,
50            u32,
51            Box<dyn FnOnce(spa::pod::builder::ObjectBuilder) -> spa::pod::builder::ObjectBuilder>,
52        ) -> std::io::Result<()>,
53    >,
54    pub(crate) send_command: Box<
55        dyn FnMut(
56            &Proxy<T>,
57            Box<dyn FnOnce(spa::pod::builder::Builder) -> spa::pod::builder::Builder>,
58        ) -> std::io::Result<()>,
59    >,
60}
61
62bitflags! {
63    /// A bit mask of changes signalled in the [NodeEvents::info] event.
64    #[repr(C)]
65    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
66    pub struct NodeChangeMask : u32 {
67        /// Input ports changed.
68        const INPUT_PORTS = (1 << 0);
69        /// Output ports changed.
70        const OUTPUT_PORTS = (1 << 1);
71        /// Node state changed.
72        const STATE = (1 << 2);
73        /// Node properties changed.
74        const PROPS = (1 << 3);
75        /// Node params changed.
76        const PARAMS = (1 << 4);
77    }
78}
79
80#[repr(u32)]
81#[derive(Clone, Copy, Debug, Eq, PartialEq, macros::EnumU32)]
82/// Represents the current state of a node.
83pub enum NodeState {
84    /// Node is in an error state.
85    Error,
86    /// Node is being created.
87    Creating,
88    /// Node is suspended.
89    Suspended,
90    /// Node is running, but no port is active.
91    Idle,
92    /// Node is running.
93    Running,
94}
95
96/// Node information that is provided in a [NodeEvents::info] event.
97pub struct NodeInfo<'a> {
98    /// The ID of the node.
99    pub id: Id,
100    /// Maximum number of input ports.
101    pub max_input_ports: u32,
102    /// Maximum number of output ports.
103    pub max_output_ports: u32,
104    /// What changed since the last call.
105    pub mask: NodeChangeMask,
106    /// Number of input ports.
107    pub n_input_ports: u32,
108    /// Number of output ports.
109    pub n_output_ports: u32,
110    /// The node's current state.
111    pub state: NodeState,
112    /// The error reason if `state` is [NodeState::Error].
113    pub error: Option<&'a str>,
114    /// The node's properties.
115    pub props: &'a Properties,
116    /// Node parameters that changed.
117    pub params: &'a [(spa::param::ParamType, spa::param::ParamInfoFlags)],
118}
119
120/// Node events that can be subscribed to.
121#[allow(clippy::type_complexity)]
122#[derive(Default)]
123pub struct NodeEvents {
124    /// Node information became available, or changed.
125    pub info: Option<Box<dyn FnMut(&NodeInfo<'_>) + Send>>,
126    /// Node permissions, notified due to a [Node::subscribe_params] or [Node::enum_params]
127    /// call.
128    pub param:
129        Option<Box<dyn FnMut(u32, spa::param::ParamType, u32, u32, &spa::pod::RawPodOwned) + Send>>,
130}
131
132impl HasProxy for Node {
133    fn type_(&self) -> types::ObjectType {
134        types::interface::NODE
135    }
136
137    fn version(&self) -> u32 {
138        3
139    }
140
141    fn proxy(&self) -> Proxy<Self> {
142        self.inner
143            .proxy
144            .read()
145            .unwrap()
146            .as_ref()
147            .expect("Node proxy should be initialised on creation")
148            .clone()
149    }
150}
151
152impl Node {
153    pub(crate) fn new(core: &Core) -> Self {
154        let this = Self {
155            inner: new_refcounted(InnerNode::new(core)),
156        };
157
158        let id = core.next_proxy_id();
159        this.inner
160            .proxy
161            .write()
162            .unwrap()
163            .replace(Proxy::new(id, &this));
164        core.add_proxy(&this, id);
165
166        this
167    }
168
169    /// Register for notifications of node events.
170    pub fn add_listener(&self, events: NodeEvents) -> HookId {
171        self.inner.hooks.lock().unwrap().append(events)
172    }
173
174    /// Remove a set of event listeners.
175    pub fn remove_listener(&self, hook_id: HookId) {
176        self.inner.hooks.lock().unwrap().remove(hook_id);
177    }
178
179    /// Register for notifications of the specified param types.
180    pub fn subscribe_params(&self, ids: &[spa::param::ParamType]) -> std::io::Result<()> {
181        let proxy = self.proxy();
182        proxy_object_invoke!(proxy, subscribe_params, ids)
183    }
184
185    /// Enumerate params (via [NodeEvents::param]). Set `id` to [None] to query all param types.
186    pub fn enum_params(
187        &self,
188        seq: u32,
189        id: Option<spa::param::ParamType>,
190        start: u32,
191        num: u32,
192        filter: Option<ParamBuilder>,
193    ) -> std::io::Result<()> {
194        let proxy = self.proxy();
195        proxy_object_invoke!(proxy, enum_params, seq, id, start, num, filter)
196    }
197
198    /// Set a parameter on the node.
199    pub fn set_param(
200        &self,
201        param_id: spa::param::ParamType,
202        object_type: spa::pod::types::ObjectType,
203        flags: u32,
204        builder: Box<
205            dyn FnOnce(spa::pod::builder::ObjectBuilder) -> spa::pod::builder::ObjectBuilder,
206        >,
207    ) -> std::io::Result<()> {
208        let proxy = self.proxy();
209        proxy_object_invoke!(proxy, set_param, param_id, object_type, flags, builder)
210    }
211
212    /// Send a command to the node.
213    pub fn send_command(
214        &self,
215        builder: Box<dyn FnOnce(spa::pod::builder::Builder) -> spa::pod::builder::Builder>,
216    ) -> std::io::Result<()> {
217        let proxy = self.proxy();
218        proxy_object_invoke!(proxy, send_command, builder)
219    }
220
221    pub(crate) fn methods(&self) -> Arc<Mutex<NodeMethods<Node>>> {
222        self.inner.methods.clone()
223    }
224
225    pub(crate) fn events(&self) -> Arc<Mutex<spa::hook::HookList<NodeEvents>>> {
226        self.inner.hooks.clone()
227    }
228}
229
230impl InnerNode {
231    fn new(core: &Core) -> Self {
232        Self {
233            proxy: RwLock::new(None),
234            methods: Arc::new(Mutex::new(protocol::marshal::node::Methods::marshal(
235                core.connection(),
236            ))),
237            hooks: spa::hook::HookList::new(),
238        }
239    }
240}