Skip to main content

clasp_bridge/
osc.rs

1//! OSC (Open Sound Control) bridge
2
3use async_trait::async_trait;
4use clasp_core::{Message, PublishMessage, QoS, SetMessage, SignalType, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15/// OSC bridge configuration
16#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18    /// Local address to bind
19    pub bind_addr: String,
20    /// Remote address to send to (optional)
21    pub remote_addr: Option<String>,
22    /// Address prefix for Clasp
23    pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27    fn default() -> Self {
28        Self {
29            bind_addr: "0.0.0.0:8000".to_string(),
30            remote_addr: None,
31            namespace: "/osc".to_string(),
32        }
33    }
34}
35
36/// OSC to Clasp bridge
37pub struct OscBridge {
38    config: BridgeConfig,
39    osc_config: OscBridgeConfig,
40    socket: Option<Arc<UdpSocket>>,
41    running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45    pub fn new(osc_config: OscBridgeConfig) -> Self {
46        let config = BridgeConfig {
47            name: "OSC Bridge".to_string(),
48            protocol: "osc".to_string(),
49            bidirectional: true,
50            ..Default::default()
51        };
52
53        Self {
54            config,
55            osc_config,
56            socket: None,
57            running: Arc::new(Mutex::new(false)),
58        }
59    }
60
61    /// Convert OSC message to Clasp message
62    fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
63        let address = format!("{}{}", self.osc_config.namespace, msg.addr);
64
65        // Convert OSC args to Clasp value
66        let value = if msg.args.is_empty() {
67            Value::Null
68        } else if msg.args.len() == 1 {
69            osc_arg_to_value(&msg.args[0])
70        } else {
71            Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
72        };
73
74        Some(Message::Set(SetMessage {
75            address,
76            value,
77            revision: None,
78            lock: false,
79            unlock: false,
80        }))
81    }
82
83    /// Convert Clasp message to OSC
84    fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
85        match msg {
86            Message::Set(set) => {
87                // Strip namespace prefix
88                let addr = set
89                    .address
90                    .strip_prefix(&self.osc_config.namespace)
91                    .unwrap_or(&set.address);
92
93                let args = value_to_osc_args(&set.value);
94
95                Some(OscPacket::Message(OscMessage {
96                    addr: addr.to_string(),
97                    args,
98                }))
99            }
100            Message::Publish(pub_msg) => {
101                let addr = pub_msg
102                    .address
103                    .strip_prefix(&self.osc_config.namespace)
104                    .unwrap_or(&pub_msg.address);
105
106                let args = if let Some(ref value) = pub_msg.value {
107                    value_to_osc_args(value)
108                } else if let Some(ref payload) = pub_msg.payload {
109                    value_to_osc_args(payload)
110                } else {
111                    vec![]
112                };
113
114                Some(OscPacket::Message(OscMessage {
115                    addr: addr.to_string(),
116                    args,
117                }))
118            }
119            _ => None,
120        }
121    }
122}
123
124#[async_trait]
125impl Bridge for OscBridge {
126    fn config(&self) -> &BridgeConfig {
127        &self.config
128    }
129
130    async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
131        if *self.running.lock() {
132            return Err(BridgeError::Other("Bridge already running".to_string()));
133        }
134
135        let socket = UdpSocket::bind(&self.osc_config.bind_addr)
136            .await
137            .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
138
139        info!("OSC bridge listening on {}", self.osc_config.bind_addr);
140
141        let socket = Arc::new(socket);
142        self.socket = Some(socket.clone());
143        *self.running.lock() = true;
144
145        let (tx, rx) = mpsc::channel(100);
146        let running = self.running.clone();
147        let namespace = self.osc_config.namespace.clone();
148
149        // Spawn receiver task
150        tokio::spawn(async move {
151            let mut buf = vec![0u8; 65536];
152
153            let _ = tx.send(BridgeEvent::Connected).await;
154
155            while *running.lock() {
156                match socket.recv_from(&mut buf).await {
157                    Ok((len, from)) => {
158                        debug!("OSC received {} bytes from {}", len, from);
159
160                        // Parse OSC packet
161                        match rosc::decoder::decode_udp(&buf[..len]) {
162                            Ok((_, packet)) => {
163                                if let Some(messages) = packet_to_messages(&packet, &namespace) {
164                                    for msg in messages {
165                                        if tx.send(BridgeEvent::ToClasp(msg)).await.is_err() {
166                                            break;
167                                        }
168                                    }
169                                }
170                            }
171                            Err(e) => {
172                                debug!("OSC decode error: {:?}", e);
173                            }
174                        }
175                    }
176                    Err(e) => {
177                        error!("OSC receive error: {}", e);
178                        let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
179                    }
180                }
181            }
182
183            let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
184        });
185
186        Ok(rx)
187    }
188
189    async fn stop(&mut self) -> Result<()> {
190        *self.running.lock() = false;
191        self.socket = None;
192        info!("OSC bridge stopped");
193        Ok(())
194    }
195
196    async fn send(&self, message: Message) -> Result<()> {
197        let socket = self
198            .socket
199            .as_ref()
200            .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
201
202        let remote = self
203            .osc_config
204            .remote_addr
205            .as_ref()
206            .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
207
208        let remote_addr: SocketAddr = remote
209            .parse()
210            .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
211
212        // Check for QoS degradation - OSC is always Fire (UDP, no guarantees)
213        let original_qos = message.default_qos();
214        if original_qos != QoS::Fire {
215            let address = match &message {
216                Message::Set(set) => &set.address,
217                Message::Publish(pub_msg) => &pub_msg.address,
218                _ => "",
219            };
220            warn!(
221                address = %address,
222                original_qos = ?original_qos,
223                "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
224            );
225        }
226
227        if let Some(packet) = self.clasp_to_osc(&message) {
228            let bytes = rosc::encoder::encode(&packet)
229                .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
230
231            socket
232                .send_to(&bytes, remote_addr)
233                .await
234                .map_err(|e| BridgeError::Send(e.to_string()))?;
235
236            debug!("Sent OSC message to {}", remote_addr);
237        }
238
239        Ok(())
240    }
241
242    fn is_running(&self) -> bool {
243        *self.running.lock()
244    }
245
246    fn namespace(&self) -> &str {
247        &self.osc_config.namespace
248    }
249}
250
251/// Convert OSC argument to Clasp value
252fn osc_arg_to_value(arg: &OscType) -> Value {
253    match arg {
254        OscType::Int(i) => Value::Int(*i as i64),
255        OscType::Float(f) => Value::Float(*f as f64),
256        OscType::String(s) => Value::String(s.clone()),
257        OscType::Blob(b) => Value::Bytes(b.clone()),
258        OscType::Long(l) => Value::Int(*l),
259        OscType::Double(d) => Value::Float(*d),
260        OscType::Bool(b) => Value::Bool(*b),
261        OscType::Nil => Value::Null,
262        OscType::Inf => Value::Float(f64::INFINITY),
263        _ => Value::Null,
264    }
265}
266
267/// Convert Clasp value to OSC arguments
268fn value_to_osc_args(value: &Value) -> Vec<OscType> {
269    match value {
270        Value::Null => vec![],
271        Value::Bool(b) => vec![OscType::Bool(*b)],
272        Value::Int(i) => vec![OscType::Long(*i)],
273        Value::Float(f) => vec![OscType::Double(*f)],
274        Value::String(s) => vec![OscType::String(s.clone())],
275        Value::Bytes(b) => vec![OscType::Blob(b.clone())],
276        Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
277        Value::Map(_) => vec![OscType::String(
278            serde_json::to_string(value).unwrap_or_default(),
279        )],
280    }
281}
282
283/// Convert OSC packet to Clasp messages
284fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
285    match packet {
286        OscPacket::Message(msg) => {
287            let address = format!("{}{}", namespace, msg.addr);
288            let value = if msg.args.is_empty() {
289                Value::Null
290            } else if msg.args.len() == 1 {
291                osc_arg_to_value(&msg.args[0])
292            } else {
293                Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
294            };
295
296            Some(vec![Message::Set(SetMessage {
297                address,
298                value,
299                revision: None,
300                lock: false,
301                unlock: false,
302            })])
303        }
304        OscPacket::Bundle(bundle) => {
305            let messages: Vec<Message> = bundle
306                .content
307                .iter()
308                .filter_map(|p| packet_to_messages(p, namespace))
309                .flatten()
310                .collect();
311
312            if messages.is_empty() {
313                None
314            } else {
315                // Wrap in bundle with timestamp
316                let timestamp = bundle.timetag.seconds as u64 * 1_000_000
317                    + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
318
319                Some(vec![Message::Bundle(clasp_core::BundleMessage {
320                    timestamp: Some(timestamp),
321                    messages,
322                })])
323            }
324        }
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_osc_arg_conversion() {
334        assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
335        assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
336        assert_eq!(
337            osc_arg_to_value(&OscType::String("test".to_string())),
338            Value::String("test".to_string())
339        );
340    }
341
342    #[test]
343    fn test_value_to_osc() {
344        let args = value_to_osc_args(&Value::Float(0.75));
345        assert_eq!(args.len(), 1);
346        match &args[0] {
347            OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
348            _ => panic!("Expected Double"),
349        }
350    }
351}