1use async_trait::async_trait;
4use clasp_core::{Message, QoS, SetMessage, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18 pub bind_addr: String,
20 pub remote_addr: Option<String>,
22 pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27 fn default() -> Self {
28 Self {
29 bind_addr: "0.0.0.0:8000".to_string(),
30 remote_addr: None,
31 namespace: "/osc".to_string(),
32 }
33 }
34}
35
36pub struct OscBridge {
38 config: BridgeConfig,
39 osc_config: OscBridgeConfig,
40 socket: Option<Arc<UdpSocket>>,
41 running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45 pub fn new(osc_config: OscBridgeConfig) -> Self {
46 let config = BridgeConfig {
47 name: "OSC Bridge".to_string(),
48 protocol: "osc".to_string(),
49 bidirectional: true,
50 ..Default::default()
51 };
52
53 Self {
54 config,
55 osc_config,
56 socket: None,
57 running: Arc::new(Mutex::new(false)),
58 }
59 }
60
61 #[allow(dead_code)]
63 fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
64 let address = format!("{}{}", self.osc_config.namespace, msg.addr);
65
66 let value = if msg.args.is_empty() {
68 Value::Null
69 } else if msg.args.len() == 1 {
70 osc_arg_to_value(&msg.args[0])
71 } else {
72 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
73 };
74
75 Some(Message::Set(SetMessage {
76 address,
77 value,
78 revision: None,
79 lock: false,
80 unlock: false,
81 }))
82 }
83
84 fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
86 match msg {
87 Message::Set(set) => {
88 let addr = set
90 .address
91 .strip_prefix(&self.osc_config.namespace)
92 .unwrap_or(&set.address);
93
94 let args = value_to_osc_args(&set.value);
95
96 Some(OscPacket::Message(OscMessage {
97 addr: addr.to_string(),
98 args,
99 }))
100 }
101 Message::Publish(pub_msg) => {
102 let addr = pub_msg
103 .address
104 .strip_prefix(&self.osc_config.namespace)
105 .unwrap_or(&pub_msg.address);
106
107 let args = if let Some(ref value) = pub_msg.value {
108 value_to_osc_args(value)
109 } else if let Some(ref payload) = pub_msg.payload {
110 value_to_osc_args(payload)
111 } else {
112 vec![]
113 };
114
115 Some(OscPacket::Message(OscMessage {
116 addr: addr.to_string(),
117 args,
118 }))
119 }
120 _ => None,
121 }
122 }
123}
124
125#[async_trait]
126impl Bridge for OscBridge {
127 fn config(&self) -> &BridgeConfig {
128 &self.config
129 }
130
131 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
132 if *self.running.lock() {
133 return Err(BridgeError::Other("Bridge already running".to_string()));
134 }
135
136 let socket = UdpSocket::bind(&self.osc_config.bind_addr)
137 .await
138 .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
139
140 info!("OSC bridge listening on {}", self.osc_config.bind_addr);
141
142 let socket = Arc::new(socket);
143 self.socket = Some(socket.clone());
144 *self.running.lock() = true;
145
146 let (tx, rx) = mpsc::channel(100);
147 let running = self.running.clone();
148 let namespace = self.osc_config.namespace.clone();
149
150 tokio::spawn(async move {
152 let mut buf = vec![0u8; 65536];
153
154 let _ = tx.send(BridgeEvent::Connected).await;
155
156 while *running.lock() {
157 match socket.recv_from(&mut buf).await {
158 Ok((len, from)) => {
159 debug!("OSC received {} bytes from {}", len, from);
160
161 match rosc::decoder::decode_udp(&buf[..len]) {
163 Ok((_, packet)) => {
164 if let Some(messages) = packet_to_messages(&packet, &namespace) {
165 for msg in messages {
166 if tx
167 .send(BridgeEvent::ToClasp(Box::new(msg)))
168 .await
169 .is_err()
170 {
171 break;
172 }
173 }
174 }
175 }
176 Err(e) => {
177 debug!("OSC decode error: {:?}", e);
178 }
179 }
180 }
181 Err(e) => {
182 error!("OSC receive error: {}", e);
183 let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
184 }
185 }
186 }
187
188 let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
189 });
190
191 Ok(rx)
192 }
193
194 async fn stop(&mut self) -> Result<()> {
195 *self.running.lock() = false;
196 self.socket = None;
197 info!("OSC bridge stopped");
198 Ok(())
199 }
200
201 async fn send(&self, message: Message) -> Result<()> {
202 let socket = self
203 .socket
204 .as_ref()
205 .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
206
207 let remote = self
208 .osc_config
209 .remote_addr
210 .as_ref()
211 .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
212
213 let remote_addr: SocketAddr = remote
214 .parse()
215 .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
216
217 let original_qos = message.default_qos();
219 if original_qos != QoS::Fire {
220 let address = match &message {
221 Message::Set(set) => &set.address,
222 Message::Publish(pub_msg) => &pub_msg.address,
223 _ => "",
224 };
225 warn!(
226 address = %address,
227 original_qos = ?original_qos,
228 "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
229 );
230 }
231
232 if let Some(packet) = self.clasp_to_osc(&message) {
233 let bytes = rosc::encoder::encode(&packet)
234 .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
235
236 socket
237 .send_to(&bytes, remote_addr)
238 .await
239 .map_err(|e| BridgeError::Send(e.to_string()))?;
240
241 debug!("Sent OSC message to {}", remote_addr);
242 }
243
244 Ok(())
245 }
246
247 fn is_running(&self) -> bool {
248 *self.running.lock()
249 }
250
251 fn namespace(&self) -> &str {
252 &self.osc_config.namespace
253 }
254}
255
256fn osc_arg_to_value(arg: &OscType) -> Value {
258 match arg {
259 OscType::Int(i) => Value::Int(*i as i64),
260 OscType::Float(f) => Value::Float(*f as f64),
261 OscType::String(s) => Value::String(s.clone()),
262 OscType::Blob(b) => Value::Bytes(b.clone()),
263 OscType::Long(l) => Value::Int(*l),
264 OscType::Double(d) => Value::Float(*d),
265 OscType::Bool(b) => Value::Bool(*b),
266 OscType::Nil => Value::Null,
267 OscType::Inf => Value::Float(f64::INFINITY),
268 _ => Value::Null,
269 }
270}
271
272fn value_to_osc_args(value: &Value) -> Vec<OscType> {
274 match value {
275 Value::Null => vec![],
276 Value::Bool(b) => vec![OscType::Bool(*b)],
277 Value::Int(i) => vec![OscType::Long(*i)],
278 Value::Float(f) => vec![OscType::Double(*f)],
279 Value::String(s) => vec![OscType::String(s.clone())],
280 Value::Bytes(b) => vec![OscType::Blob(b.clone())],
281 Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
282 Value::Map(_) => vec![OscType::String(
283 serde_json::to_string(value).unwrap_or_default(),
284 )],
285 }
286}
287
288fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
290 match packet {
291 OscPacket::Message(msg) => {
292 let address = format!("{}{}", namespace, msg.addr);
293 let value = if msg.args.is_empty() {
294 Value::Null
295 } else if msg.args.len() == 1 {
296 osc_arg_to_value(&msg.args[0])
297 } else {
298 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
299 };
300
301 Some(vec![Message::Set(SetMessage {
302 address,
303 value,
304 revision: None,
305 lock: false,
306 unlock: false,
307 })])
308 }
309 OscPacket::Bundle(bundle) => {
310 let messages: Vec<Message> = bundle
311 .content
312 .iter()
313 .filter_map(|p| packet_to_messages(p, namespace))
314 .flatten()
315 .collect();
316
317 if messages.is_empty() {
318 None
319 } else {
320 let timestamp = bundle.timetag.seconds as u64 * 1_000_000
322 + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
323
324 Some(vec![Message::Bundle(clasp_core::BundleMessage {
325 timestamp: Some(timestamp),
326 messages,
327 })])
328 }
329 }
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_osc_arg_conversion() {
339 assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
340 assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
341 assert_eq!(
342 osc_arg_to_value(&OscType::String("test".to_string())),
343 Value::String("test".to_string())
344 );
345 }
346
347 #[test]
348 fn test_value_to_osc() {
349 let args = value_to_osc_args(&Value::Float(0.75));
350 assert_eq!(args.len(), 1);
351 match &args[0] {
352 OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
353 _ => panic!("Expected Double"),
354 }
355 }
356}