Skip to main content

clasp_bridge/
osc.rs

1//! OSC (Open Sound Control) bridge
2
3use async_trait::async_trait;
4use clasp_core::{Message, QoS, SetMessage, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15/// OSC bridge configuration
16#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18    /// Local address to bind
19    pub bind_addr: String,
20    /// Remote address to send to (optional)
21    pub remote_addr: Option<String>,
22    /// Address prefix for Clasp
23    pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27    fn default() -> Self {
28        Self {
29            bind_addr: "0.0.0.0:8000".to_string(),
30            remote_addr: None,
31            namespace: "/osc".to_string(),
32        }
33    }
34}
35
36/// OSC to Clasp bridge
37pub struct OscBridge {
38    config: BridgeConfig,
39    osc_config: OscBridgeConfig,
40    socket: Option<Arc<UdpSocket>>,
41    running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45    pub fn new(osc_config: OscBridgeConfig) -> Self {
46        let config = BridgeConfig {
47            name: "OSC Bridge".to_string(),
48            protocol: "osc".to_string(),
49            bidirectional: true,
50            ..Default::default()
51        };
52
53        Self {
54            config,
55            osc_config,
56            socket: None,
57            running: Arc::new(Mutex::new(false)),
58        }
59    }
60
61    /// Convert OSC message to Clasp message
62    #[allow(dead_code)]
63    fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
64        let address = format!("{}{}", self.osc_config.namespace, msg.addr);
65
66        // Convert OSC args to Clasp value
67        let value = if msg.args.is_empty() {
68            Value::Null
69        } else if msg.args.len() == 1 {
70            osc_arg_to_value(&msg.args[0])
71        } else {
72            Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
73        };
74
75        Some(Message::Set(SetMessage {
76            address,
77            value,
78            revision: None,
79            lock: false,
80            unlock: false,
81            ttl: None,
82        }))
83    }
84
85    /// Convert Clasp message to OSC
86    fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
87        match msg {
88            Message::Set(set) => {
89                // Strip namespace prefix
90                let addr = set
91                    .address
92                    .strip_prefix(&self.osc_config.namespace)
93                    .unwrap_or(&set.address);
94
95                let args = value_to_osc_args(&set.value);
96
97                Some(OscPacket::Message(OscMessage {
98                    addr: addr.to_string(),
99                    args,
100                }))
101            }
102            Message::Publish(pub_msg) => {
103                let addr = pub_msg
104                    .address
105                    .strip_prefix(&self.osc_config.namespace)
106                    .unwrap_or(&pub_msg.address);
107
108                let args = if let Some(ref value) = pub_msg.value {
109                    value_to_osc_args(value)
110                } else if let Some(ref payload) = pub_msg.payload {
111                    value_to_osc_args(payload)
112                } else {
113                    vec![]
114                };
115
116                Some(OscPacket::Message(OscMessage {
117                    addr: addr.to_string(),
118                    args,
119                }))
120            }
121            _ => None,
122        }
123    }
124}
125
126#[async_trait]
127impl Bridge for OscBridge {
128    fn config(&self) -> &BridgeConfig {
129        &self.config
130    }
131
132    async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
133        if *self.running.lock() {
134            return Err(BridgeError::Other("Bridge already running".to_string()));
135        }
136
137        let socket = UdpSocket::bind(&self.osc_config.bind_addr)
138            .await
139            .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
140
141        info!("OSC bridge listening on {}", self.osc_config.bind_addr);
142
143        let socket = Arc::new(socket);
144        self.socket = Some(socket.clone());
145        *self.running.lock() = true;
146
147        let (tx, rx) = mpsc::channel(100);
148        let running = self.running.clone();
149        let namespace = self.osc_config.namespace.clone();
150
151        // Spawn receiver task
152        tokio::spawn(async move {
153            let mut buf = vec![0u8; 65536];
154
155            let _ = tx.send(BridgeEvent::Connected).await;
156
157            while *running.lock() {
158                match socket.recv_from(&mut buf).await {
159                    Ok((len, from)) => {
160                        debug!("OSC received {} bytes from {}", len, from);
161
162                        // Parse OSC packet
163                        match rosc::decoder::decode_udp(&buf[..len]) {
164                            Ok((_, packet)) => {
165                                if let Some(messages) = packet_to_messages(&packet, &namespace) {
166                                    for msg in messages {
167                                        if tx
168                                            .send(BridgeEvent::ToClasp(Box::new(msg)))
169                                            .await
170                                            .is_err()
171                                        {
172                                            break;
173                                        }
174                                    }
175                                }
176                            }
177                            Err(e) => {
178                                debug!("OSC decode error: {:?}", e);
179                            }
180                        }
181                    }
182                    Err(e) => {
183                        error!("OSC receive error: {}", e);
184                        let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
185                    }
186                }
187            }
188
189            let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
190        });
191
192        Ok(rx)
193    }
194
195    async fn stop(&mut self) -> Result<()> {
196        *self.running.lock() = false;
197        self.socket = None;
198        info!("OSC bridge stopped");
199        Ok(())
200    }
201
202    async fn send(&self, message: Message) -> Result<()> {
203        let socket = self
204            .socket
205            .as_ref()
206            .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
207
208        let remote = self
209            .osc_config
210            .remote_addr
211            .as_ref()
212            .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
213
214        let remote_addr: SocketAddr = remote
215            .parse()
216            .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
217
218        // Check for QoS degradation - OSC is always Fire (UDP, no guarantees)
219        let original_qos = message.default_qos();
220        if original_qos != QoS::Fire {
221            let address = match &message {
222                Message::Set(set) => &set.address,
223                Message::Publish(pub_msg) => &pub_msg.address,
224                _ => "",
225            };
226            warn!(
227                address = %address,
228                original_qos = ?original_qos,
229                "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
230            );
231        }
232
233        if let Some(packet) = self.clasp_to_osc(&message) {
234            let bytes = rosc::encoder::encode(&packet)
235                .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
236
237            socket
238                .send_to(&bytes, remote_addr)
239                .await
240                .map_err(|e| BridgeError::Send(e.to_string()))?;
241
242            debug!("Sent OSC message to {}", remote_addr);
243        }
244
245        Ok(())
246    }
247
248    fn is_running(&self) -> bool {
249        *self.running.lock()
250    }
251
252    fn namespace(&self) -> &str {
253        &self.osc_config.namespace
254    }
255}
256
257/// Convert OSC argument to Clasp value
258fn osc_arg_to_value(arg: &OscType) -> Value {
259    match arg {
260        OscType::Int(i) => Value::Int(*i as i64),
261        OscType::Float(f) => Value::Float(*f as f64),
262        OscType::String(s) => Value::String(s.clone()),
263        OscType::Blob(b) => Value::Bytes(b.clone()),
264        OscType::Long(l) => Value::Int(*l),
265        OscType::Double(d) => Value::Float(*d),
266        OscType::Bool(b) => Value::Bool(*b),
267        OscType::Nil => Value::Null,
268        OscType::Inf => Value::Float(f64::INFINITY),
269        _ => Value::Null,
270    }
271}
272
273/// Convert Clasp value to OSC arguments
274fn value_to_osc_args(value: &Value) -> Vec<OscType> {
275    match value {
276        Value::Null => vec![],
277        Value::Bool(b) => vec![OscType::Bool(*b)],
278        Value::Int(i) => vec![OscType::Long(*i)],
279        Value::Float(f) => vec![OscType::Double(*f)],
280        Value::String(s) => vec![OscType::String(s.clone())],
281        Value::Bytes(b) => vec![OscType::Blob(b.clone())],
282        Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
283        Value::Map(_) => vec![OscType::String(
284            serde_json::to_string(value).unwrap_or_default(),
285        )],
286    }
287}
288
289/// Convert OSC packet to Clasp messages
290fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
291    match packet {
292        OscPacket::Message(msg) => {
293            let address = format!("{}{}", namespace, msg.addr);
294            let value = if msg.args.is_empty() {
295                Value::Null
296            } else if msg.args.len() == 1 {
297                osc_arg_to_value(&msg.args[0])
298            } else {
299                Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
300            };
301
302            Some(vec![Message::Set(SetMessage {
303                address,
304                value,
305                revision: None,
306                lock: false,
307                unlock: false,
308                ttl: None,
309            })])
310        }
311        OscPacket::Bundle(bundle) => {
312            let messages: Vec<Message> = bundle
313                .content
314                .iter()
315                .filter_map(|p| packet_to_messages(p, namespace))
316                .flatten()
317                .collect();
318
319            if messages.is_empty() {
320                None
321            } else {
322                // Wrap in bundle with timestamp
323                let timestamp = bundle.timetag.seconds as u64 * 1_000_000
324                    + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
325
326                Some(vec![Message::Bundle(clasp_core::BundleMessage {
327                    timestamp: Some(timestamp),
328                    messages,
329                })])
330            }
331        }
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn test_osc_arg_conversion() {
341        assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
342        assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
343        assert_eq!(
344            osc_arg_to_value(&OscType::String("test".to_string())),
345            Value::String("test".to_string())
346        );
347    }
348
349    #[test]
350    fn test_value_to_osc() {
351        let args = value_to_osc_args(&Value::Float(0.75));
352        assert_eq!(args.len(), 1);
353        match &args[0] {
354            OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
355            _ => panic!("Expected Double"),
356        }
357    }
358}