Skip to main content

clasp_bridge/
osc.rs

1//! OSC (Open Sound Control) bridge
2
3use async_trait::async_trait;
4use clasp_core::{Message, QoS, SetMessage, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15/// OSC bridge configuration
16#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18    /// Local address to bind
19    pub bind_addr: String,
20    /// Remote address to send to (optional)
21    pub remote_addr: Option<String>,
22    /// Address prefix for Clasp
23    pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27    fn default() -> Self {
28        Self {
29            bind_addr: "0.0.0.0:8000".to_string(),
30            remote_addr: None,
31            namespace: "/osc".to_string(),
32        }
33    }
34}
35
36/// OSC to Clasp bridge
37pub struct OscBridge {
38    config: BridgeConfig,
39    osc_config: OscBridgeConfig,
40    socket: Option<Arc<UdpSocket>>,
41    running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45    pub fn new(osc_config: OscBridgeConfig) -> Self {
46        let config = BridgeConfig {
47            name: "OSC Bridge".to_string(),
48            protocol: "osc".to_string(),
49            bidirectional: true,
50            ..Default::default()
51        };
52
53        Self {
54            config,
55            osc_config,
56            socket: None,
57            running: Arc::new(Mutex::new(false)),
58        }
59    }
60
61    /// Convert OSC message to Clasp message
62    #[allow(dead_code)]
63    fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
64        let address = format!("{}{}", self.osc_config.namespace, msg.addr);
65
66        // Convert OSC args to Clasp value
67        let value = if msg.args.is_empty() {
68            Value::Null
69        } else if msg.args.len() == 1 {
70            osc_arg_to_value(&msg.args[0])
71        } else {
72            Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
73        };
74
75        Some(Message::Set(SetMessage {
76            address,
77            value,
78            revision: None,
79            lock: false,
80            unlock: false,
81        }))
82    }
83
84    /// Convert Clasp message to OSC
85    fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
86        match msg {
87            Message::Set(set) => {
88                // Strip namespace prefix
89                let addr = set
90                    .address
91                    .strip_prefix(&self.osc_config.namespace)
92                    .unwrap_or(&set.address);
93
94                let args = value_to_osc_args(&set.value);
95
96                Some(OscPacket::Message(OscMessage {
97                    addr: addr.to_string(),
98                    args,
99                }))
100            }
101            Message::Publish(pub_msg) => {
102                let addr = pub_msg
103                    .address
104                    .strip_prefix(&self.osc_config.namespace)
105                    .unwrap_or(&pub_msg.address);
106
107                let args = if let Some(ref value) = pub_msg.value {
108                    value_to_osc_args(value)
109                } else if let Some(ref payload) = pub_msg.payload {
110                    value_to_osc_args(payload)
111                } else {
112                    vec![]
113                };
114
115                Some(OscPacket::Message(OscMessage {
116                    addr: addr.to_string(),
117                    args,
118                }))
119            }
120            _ => None,
121        }
122    }
123}
124
125#[async_trait]
126impl Bridge for OscBridge {
127    fn config(&self) -> &BridgeConfig {
128        &self.config
129    }
130
131    async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
132        if *self.running.lock() {
133            return Err(BridgeError::Other("Bridge already running".to_string()));
134        }
135
136        let socket = UdpSocket::bind(&self.osc_config.bind_addr)
137            .await
138            .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
139
140        info!("OSC bridge listening on {}", self.osc_config.bind_addr);
141
142        let socket = Arc::new(socket);
143        self.socket = Some(socket.clone());
144        *self.running.lock() = true;
145
146        let (tx, rx) = mpsc::channel(100);
147        let running = self.running.clone();
148        let namespace = self.osc_config.namespace.clone();
149
150        // Spawn receiver task
151        tokio::spawn(async move {
152            let mut buf = vec![0u8; 65536];
153
154            let _ = tx.send(BridgeEvent::Connected).await;
155
156            while *running.lock() {
157                match socket.recv_from(&mut buf).await {
158                    Ok((len, from)) => {
159                        debug!("OSC received {} bytes from {}", len, from);
160
161                        // Parse OSC packet
162                        match rosc::decoder::decode_udp(&buf[..len]) {
163                            Ok((_, packet)) => {
164                                if let Some(messages) = packet_to_messages(&packet, &namespace) {
165                                    for msg in messages {
166                                        if tx
167                                            .send(BridgeEvent::ToClasp(Box::new(msg)))
168                                            .await
169                                            .is_err()
170                                        {
171                                            break;
172                                        }
173                                    }
174                                }
175                            }
176                            Err(e) => {
177                                debug!("OSC decode error: {:?}", e);
178                            }
179                        }
180                    }
181                    Err(e) => {
182                        error!("OSC receive error: {}", e);
183                        let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
184                    }
185                }
186            }
187
188            let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
189        });
190
191        Ok(rx)
192    }
193
194    async fn stop(&mut self) -> Result<()> {
195        *self.running.lock() = false;
196        self.socket = None;
197        info!("OSC bridge stopped");
198        Ok(())
199    }
200
201    async fn send(&self, message: Message) -> Result<()> {
202        let socket = self
203            .socket
204            .as_ref()
205            .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
206
207        let remote = self
208            .osc_config
209            .remote_addr
210            .as_ref()
211            .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
212
213        let remote_addr: SocketAddr = remote
214            .parse()
215            .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
216
217        // Check for QoS degradation - OSC is always Fire (UDP, no guarantees)
218        let original_qos = message.default_qos();
219        if original_qos != QoS::Fire {
220            let address = match &message {
221                Message::Set(set) => &set.address,
222                Message::Publish(pub_msg) => &pub_msg.address,
223                _ => "",
224            };
225            warn!(
226                address = %address,
227                original_qos = ?original_qos,
228                "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
229            );
230        }
231
232        if let Some(packet) = self.clasp_to_osc(&message) {
233            let bytes = rosc::encoder::encode(&packet)
234                .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
235
236            socket
237                .send_to(&bytes, remote_addr)
238                .await
239                .map_err(|e| BridgeError::Send(e.to_string()))?;
240
241            debug!("Sent OSC message to {}", remote_addr);
242        }
243
244        Ok(())
245    }
246
247    fn is_running(&self) -> bool {
248        *self.running.lock()
249    }
250
251    fn namespace(&self) -> &str {
252        &self.osc_config.namespace
253    }
254}
255
256/// Convert OSC argument to Clasp value
257fn osc_arg_to_value(arg: &OscType) -> Value {
258    match arg {
259        OscType::Int(i) => Value::Int(*i as i64),
260        OscType::Float(f) => Value::Float(*f as f64),
261        OscType::String(s) => Value::String(s.clone()),
262        OscType::Blob(b) => Value::Bytes(b.clone()),
263        OscType::Long(l) => Value::Int(*l),
264        OscType::Double(d) => Value::Float(*d),
265        OscType::Bool(b) => Value::Bool(*b),
266        OscType::Nil => Value::Null,
267        OscType::Inf => Value::Float(f64::INFINITY),
268        _ => Value::Null,
269    }
270}
271
272/// Convert Clasp value to OSC arguments
273fn value_to_osc_args(value: &Value) -> Vec<OscType> {
274    match value {
275        Value::Null => vec![],
276        Value::Bool(b) => vec![OscType::Bool(*b)],
277        Value::Int(i) => vec![OscType::Long(*i)],
278        Value::Float(f) => vec![OscType::Double(*f)],
279        Value::String(s) => vec![OscType::String(s.clone())],
280        Value::Bytes(b) => vec![OscType::Blob(b.clone())],
281        Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
282        Value::Map(_) => vec![OscType::String(
283            serde_json::to_string(value).unwrap_or_default(),
284        )],
285    }
286}
287
288/// Convert OSC packet to Clasp messages
289fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
290    match packet {
291        OscPacket::Message(msg) => {
292            let address = format!("{}{}", namespace, msg.addr);
293            let value = if msg.args.is_empty() {
294                Value::Null
295            } else if msg.args.len() == 1 {
296                osc_arg_to_value(&msg.args[0])
297            } else {
298                Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
299            };
300
301            Some(vec![Message::Set(SetMessage {
302                address,
303                value,
304                revision: None,
305                lock: false,
306                unlock: false,
307            })])
308        }
309        OscPacket::Bundle(bundle) => {
310            let messages: Vec<Message> = bundle
311                .content
312                .iter()
313                .filter_map(|p| packet_to_messages(p, namespace))
314                .flatten()
315                .collect();
316
317            if messages.is_empty() {
318                None
319            } else {
320                // Wrap in bundle with timestamp
321                let timestamp = bundle.timetag.seconds as u64 * 1_000_000
322                    + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
323
324                Some(vec![Message::Bundle(clasp_core::BundleMessage {
325                    timestamp: Some(timestamp),
326                    messages,
327                })])
328            }
329        }
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_osc_arg_conversion() {
339        assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
340        assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
341        assert_eq!(
342            osc_arg_to_value(&OscType::String("test".to_string())),
343            Value::String("test".to_string())
344        );
345    }
346
347    #[test]
348    fn test_value_to_osc() {
349        let args = value_to_osc_args(&Value::Float(0.75));
350        assert_eq!(args.len(), 1);
351        match &args[0] {
352            OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
353            _ => panic!("Expected Double"),
354        }
355    }
356}