1use async_trait::async_trait;
4use clasp_core::{Message, QoS, SetMessage, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18 pub bind_addr: String,
20 pub remote_addr: Option<String>,
22 pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27 fn default() -> Self {
28 Self {
29 bind_addr: "0.0.0.0:8000".to_string(),
30 remote_addr: None,
31 namespace: "/osc".to_string(),
32 }
33 }
34}
35
36pub struct OscBridge {
38 config: BridgeConfig,
39 osc_config: OscBridgeConfig,
40 socket: Option<Arc<UdpSocket>>,
41 running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45 pub fn new(osc_config: OscBridgeConfig) -> Self {
46 let config = BridgeConfig {
47 name: "OSC Bridge".to_string(),
48 protocol: "osc".to_string(),
49 bidirectional: true,
50 ..Default::default()
51 };
52
53 Self {
54 config,
55 osc_config,
56 socket: None,
57 running: Arc::new(Mutex::new(false)),
58 }
59 }
60
61 #[allow(dead_code)]
63 fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
64 let address = format!("{}{}", self.osc_config.namespace, msg.addr);
65
66 let value = if msg.args.is_empty() {
68 Value::Null
69 } else if msg.args.len() == 1 {
70 osc_arg_to_value(&msg.args[0])
71 } else {
72 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
73 };
74
75 Some(Message::Set(SetMessage {
76 address,
77 value,
78 revision: None,
79 lock: false,
80 unlock: false,
81 ttl: None,
82 }))
83 }
84
85 fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
87 match msg {
88 Message::Set(set) => {
89 let addr = set
91 .address
92 .strip_prefix(&self.osc_config.namespace)
93 .unwrap_or(&set.address);
94
95 let args = value_to_osc_args(&set.value);
96
97 Some(OscPacket::Message(OscMessage {
98 addr: addr.to_string(),
99 args,
100 }))
101 }
102 Message::Publish(pub_msg) => {
103 let addr = pub_msg
104 .address
105 .strip_prefix(&self.osc_config.namespace)
106 .unwrap_or(&pub_msg.address);
107
108 let args = if let Some(ref value) = pub_msg.value {
109 value_to_osc_args(value)
110 } else if let Some(ref payload) = pub_msg.payload {
111 value_to_osc_args(payload)
112 } else {
113 vec![]
114 };
115
116 Some(OscPacket::Message(OscMessage {
117 addr: addr.to_string(),
118 args,
119 }))
120 }
121 _ => None,
122 }
123 }
124}
125
126#[async_trait]
127impl Bridge for OscBridge {
128 fn config(&self) -> &BridgeConfig {
129 &self.config
130 }
131
132 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
133 if *self.running.lock() {
134 return Err(BridgeError::Other("Bridge already running".to_string()));
135 }
136
137 let socket = UdpSocket::bind(&self.osc_config.bind_addr)
138 .await
139 .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
140
141 info!("OSC bridge listening on {}", self.osc_config.bind_addr);
142
143 let socket = Arc::new(socket);
144 self.socket = Some(socket.clone());
145 *self.running.lock() = true;
146
147 let (tx, rx) = mpsc::channel(100);
148 let running = self.running.clone();
149 let namespace = self.osc_config.namespace.clone();
150
151 tokio::spawn(async move {
153 let mut buf = vec![0u8; 65536];
154
155 let _ = tx.send(BridgeEvent::Connected).await;
156
157 while *running.lock() {
158 match socket.recv_from(&mut buf).await {
159 Ok((len, from)) => {
160 debug!("OSC received {} bytes from {}", len, from);
161
162 match rosc::decoder::decode_udp(&buf[..len]) {
164 Ok((_, packet)) => {
165 if let Some(messages) = packet_to_messages(&packet, &namespace) {
166 for msg in messages {
167 if tx
168 .send(BridgeEvent::ToClasp(Box::new(msg)))
169 .await
170 .is_err()
171 {
172 break;
173 }
174 }
175 }
176 }
177 Err(e) => {
178 debug!("OSC decode error: {:?}", e);
179 }
180 }
181 }
182 Err(e) => {
183 error!("OSC receive error: {}", e);
184 let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
185 }
186 }
187 }
188
189 let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
190 });
191
192 Ok(rx)
193 }
194
195 async fn stop(&mut self) -> Result<()> {
196 *self.running.lock() = false;
197 self.socket = None;
198 info!("OSC bridge stopped");
199 Ok(())
200 }
201
202 async fn send(&self, message: Message) -> Result<()> {
203 let socket = self
204 .socket
205 .as_ref()
206 .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
207
208 let remote = self
209 .osc_config
210 .remote_addr
211 .as_ref()
212 .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
213
214 let remote_addr: SocketAddr = remote
215 .parse()
216 .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
217
218 let original_qos = message.default_qos();
220 if original_qos != QoS::Fire {
221 let address = match &message {
222 Message::Set(set) => &set.address,
223 Message::Publish(pub_msg) => &pub_msg.address,
224 _ => "",
225 };
226 warn!(
227 address = %address,
228 original_qos = ?original_qos,
229 "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
230 );
231 }
232
233 if let Some(packet) = self.clasp_to_osc(&message) {
234 let bytes = rosc::encoder::encode(&packet)
235 .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
236
237 socket
238 .send_to(&bytes, remote_addr)
239 .await
240 .map_err(|e| BridgeError::Send(e.to_string()))?;
241
242 debug!("Sent OSC message to {}", remote_addr);
243 }
244
245 Ok(())
246 }
247
248 fn is_running(&self) -> bool {
249 *self.running.lock()
250 }
251
252 fn namespace(&self) -> &str {
253 &self.osc_config.namespace
254 }
255}
256
257fn osc_arg_to_value(arg: &OscType) -> Value {
259 match arg {
260 OscType::Int(i) => Value::Int(*i as i64),
261 OscType::Float(f) => Value::Float(*f as f64),
262 OscType::String(s) => Value::String(s.clone()),
263 OscType::Blob(b) => Value::Bytes(b.clone()),
264 OscType::Long(l) => Value::Int(*l),
265 OscType::Double(d) => Value::Float(*d),
266 OscType::Bool(b) => Value::Bool(*b),
267 OscType::Nil => Value::Null,
268 OscType::Inf => Value::Float(f64::INFINITY),
269 _ => Value::Null,
270 }
271}
272
273fn value_to_osc_args(value: &Value) -> Vec<OscType> {
275 match value {
276 Value::Null => vec![],
277 Value::Bool(b) => vec![OscType::Bool(*b)],
278 Value::Int(i) => vec![OscType::Long(*i)],
279 Value::Float(f) => vec![OscType::Double(*f)],
280 Value::String(s) => vec![OscType::String(s.clone())],
281 Value::Bytes(b) => vec![OscType::Blob(b.clone())],
282 Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
283 Value::Map(_) => vec![OscType::String(
284 serde_json::to_string(value).unwrap_or_default(),
285 )],
286 }
287}
288
289fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
291 match packet {
292 OscPacket::Message(msg) => {
293 let address = format!("{}{}", namespace, msg.addr);
294 let value = if msg.args.is_empty() {
295 Value::Null
296 } else if msg.args.len() == 1 {
297 osc_arg_to_value(&msg.args[0])
298 } else {
299 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
300 };
301
302 Some(vec![Message::Set(SetMessage {
303 address,
304 value,
305 revision: None,
306 lock: false,
307 unlock: false,
308 ttl: None,
309 })])
310 }
311 OscPacket::Bundle(bundle) => {
312 let messages: Vec<Message> = bundle
313 .content
314 .iter()
315 .filter_map(|p| packet_to_messages(p, namespace))
316 .flatten()
317 .collect();
318
319 if messages.is_empty() {
320 None
321 } else {
322 let timestamp = bundle.timetag.seconds as u64 * 1_000_000
324 + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
325
326 Some(vec![Message::Bundle(clasp_core::BundleMessage {
327 timestamp: Some(timestamp),
328 messages,
329 })])
330 }
331 }
332 }
333}
334
335#[cfg(test)]
336mod tests {
337 use super::*;
338
339 #[test]
340 fn test_osc_arg_conversion() {
341 assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
342 assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
343 assert_eq!(
344 osc_arg_to_value(&OscType::String("test".to_string())),
345 Value::String("test".to_string())
346 );
347 }
348
349 #[test]
350 fn test_value_to_osc() {
351 let args = value_to_osc_args(&Value::Float(0.75));
352 assert_eq!(args.len(), 1);
353 match &args[0] {
354 OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
355 _ => panic!("Expected Double"),
356 }
357 }
358}