1use async_trait::async_trait;
4use clasp_core::{Message, PublishMessage, QoS, SetMessage, SignalType, Value};
5use parking_lot::Mutex;
6use rosc::{OscMessage, OscPacket, OscType};
7use std::net::SocketAddr;
8use std::sync::Arc;
9use tokio::net::UdpSocket;
10use tokio::sync::mpsc;
11use tracing::{debug, error, info, warn};
12
13use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
14
15#[derive(Debug, Clone)]
17pub struct OscBridgeConfig {
18 pub bind_addr: String,
20 pub remote_addr: Option<String>,
22 pub namespace: String,
24}
25
26impl Default for OscBridgeConfig {
27 fn default() -> Self {
28 Self {
29 bind_addr: "0.0.0.0:8000".to_string(),
30 remote_addr: None,
31 namespace: "/osc".to_string(),
32 }
33 }
34}
35
36pub struct OscBridge {
38 config: BridgeConfig,
39 osc_config: OscBridgeConfig,
40 socket: Option<Arc<UdpSocket>>,
41 running: Arc<Mutex<bool>>,
42}
43
44impl OscBridge {
45 pub fn new(osc_config: OscBridgeConfig) -> Self {
46 let config = BridgeConfig {
47 name: "OSC Bridge".to_string(),
48 protocol: "osc".to_string(),
49 bidirectional: true,
50 ..Default::default()
51 };
52
53 Self {
54 config,
55 osc_config,
56 socket: None,
57 running: Arc::new(Mutex::new(false)),
58 }
59 }
60
61 fn osc_to_clasp(&self, msg: &OscMessage) -> Option<Message> {
63 let address = format!("{}{}", self.osc_config.namespace, msg.addr);
64
65 let value = if msg.args.is_empty() {
67 Value::Null
68 } else if msg.args.len() == 1 {
69 osc_arg_to_value(&msg.args[0])
70 } else {
71 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
72 };
73
74 Some(Message::Set(SetMessage {
75 address,
76 value,
77 revision: None,
78 lock: false,
79 unlock: false,
80 }))
81 }
82
83 fn clasp_to_osc(&self, msg: &Message) -> Option<OscPacket> {
85 match msg {
86 Message::Set(set) => {
87 let addr = set
89 .address
90 .strip_prefix(&self.osc_config.namespace)
91 .unwrap_or(&set.address);
92
93 let args = value_to_osc_args(&set.value);
94
95 Some(OscPacket::Message(OscMessage {
96 addr: addr.to_string(),
97 args,
98 }))
99 }
100 Message::Publish(pub_msg) => {
101 let addr = pub_msg
102 .address
103 .strip_prefix(&self.osc_config.namespace)
104 .unwrap_or(&pub_msg.address);
105
106 let args = if let Some(ref value) = pub_msg.value {
107 value_to_osc_args(value)
108 } else if let Some(ref payload) = pub_msg.payload {
109 value_to_osc_args(payload)
110 } else {
111 vec![]
112 };
113
114 Some(OscPacket::Message(OscMessage {
115 addr: addr.to_string(),
116 args,
117 }))
118 }
119 _ => None,
120 }
121 }
122}
123
124#[async_trait]
125impl Bridge for OscBridge {
126 fn config(&self) -> &BridgeConfig {
127 &self.config
128 }
129
130 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
131 if *self.running.lock() {
132 return Err(BridgeError::Other("Bridge already running".to_string()));
133 }
134
135 let socket = UdpSocket::bind(&self.osc_config.bind_addr)
136 .await
137 .map_err(|e| BridgeError::ConnectionFailed(e.to_string()))?;
138
139 info!("OSC bridge listening on {}", self.osc_config.bind_addr);
140
141 let socket = Arc::new(socket);
142 self.socket = Some(socket.clone());
143 *self.running.lock() = true;
144
145 let (tx, rx) = mpsc::channel(100);
146 let running = self.running.clone();
147 let namespace = self.osc_config.namespace.clone();
148
149 tokio::spawn(async move {
151 let mut buf = vec![0u8; 65536];
152
153 let _ = tx.send(BridgeEvent::Connected).await;
154
155 while *running.lock() {
156 match socket.recv_from(&mut buf).await {
157 Ok((len, from)) => {
158 debug!("OSC received {} bytes from {}", len, from);
159
160 match rosc::decoder::decode_udp(&buf[..len]) {
162 Ok((_, packet)) => {
163 if let Some(messages) = packet_to_messages(&packet, &namespace) {
164 for msg in messages {
165 if tx.send(BridgeEvent::ToClasp(msg)).await.is_err() {
166 break;
167 }
168 }
169 }
170 }
171 Err(e) => {
172 debug!("OSC decode error: {:?}", e);
173 }
174 }
175 }
176 Err(e) => {
177 error!("OSC receive error: {}", e);
178 let _ = tx.send(BridgeEvent::Error(e.to_string())).await;
179 }
180 }
181 }
182
183 let _ = tx.send(BridgeEvent::Disconnected { reason: None }).await;
184 });
185
186 Ok(rx)
187 }
188
189 async fn stop(&mut self) -> Result<()> {
190 *self.running.lock() = false;
191 self.socket = None;
192 info!("OSC bridge stopped");
193 Ok(())
194 }
195
196 async fn send(&self, message: Message) -> Result<()> {
197 let socket = self
198 .socket
199 .as_ref()
200 .ok_or_else(|| BridgeError::ConnectionFailed("Not connected".to_string()))?;
201
202 let remote = self
203 .osc_config
204 .remote_addr
205 .as_ref()
206 .ok_or_else(|| BridgeError::Send("No remote address configured".to_string()))?;
207
208 let remote_addr: SocketAddr = remote
209 .parse()
210 .map_err(|e| BridgeError::Send(format!("Invalid remote address: {}", e)))?;
211
212 let original_qos = message.default_qos();
214 if original_qos != QoS::Fire {
215 let address = match &message {
216 Message::Set(set) => &set.address,
217 Message::Publish(pub_msg) => &pub_msg.address,
218 _ => "",
219 };
220 warn!(
221 address = %address,
222 original_qos = ?original_qos,
223 "CLASP->OSC: QoS downgraded to Fire (UDP has no delivery guarantees)"
224 );
225 }
226
227 if let Some(packet) = self.clasp_to_osc(&message) {
228 let bytes = rosc::encoder::encode(&packet)
229 .map_err(|e| BridgeError::Protocol(format!("OSC encode error: {:?}", e)))?;
230
231 socket
232 .send_to(&bytes, remote_addr)
233 .await
234 .map_err(|e| BridgeError::Send(e.to_string()))?;
235
236 debug!("Sent OSC message to {}", remote_addr);
237 }
238
239 Ok(())
240 }
241
242 fn is_running(&self) -> bool {
243 *self.running.lock()
244 }
245
246 fn namespace(&self) -> &str {
247 &self.osc_config.namespace
248 }
249}
250
251fn osc_arg_to_value(arg: &OscType) -> Value {
253 match arg {
254 OscType::Int(i) => Value::Int(*i as i64),
255 OscType::Float(f) => Value::Float(*f as f64),
256 OscType::String(s) => Value::String(s.clone()),
257 OscType::Blob(b) => Value::Bytes(b.clone()),
258 OscType::Long(l) => Value::Int(*l),
259 OscType::Double(d) => Value::Float(*d),
260 OscType::Bool(b) => Value::Bool(*b),
261 OscType::Nil => Value::Null,
262 OscType::Inf => Value::Float(f64::INFINITY),
263 _ => Value::Null,
264 }
265}
266
267fn value_to_osc_args(value: &Value) -> Vec<OscType> {
269 match value {
270 Value::Null => vec![],
271 Value::Bool(b) => vec![OscType::Bool(*b)],
272 Value::Int(i) => vec![OscType::Long(*i)],
273 Value::Float(f) => vec![OscType::Double(*f)],
274 Value::String(s) => vec![OscType::String(s.clone())],
275 Value::Bytes(b) => vec![OscType::Blob(b.clone())],
276 Value::Array(arr) => arr.iter().flat_map(value_to_osc_args).collect(),
277 Value::Map(_) => vec![OscType::String(
278 serde_json::to_string(value).unwrap_or_default(),
279 )],
280 }
281}
282
283fn packet_to_messages(packet: &OscPacket, namespace: &str) -> Option<Vec<Message>> {
285 match packet {
286 OscPacket::Message(msg) => {
287 let address = format!("{}{}", namespace, msg.addr);
288 let value = if msg.args.is_empty() {
289 Value::Null
290 } else if msg.args.len() == 1 {
291 osc_arg_to_value(&msg.args[0])
292 } else {
293 Value::Array(msg.args.iter().map(osc_arg_to_value).collect())
294 };
295
296 Some(vec![Message::Set(SetMessage {
297 address,
298 value,
299 revision: None,
300 lock: false,
301 unlock: false,
302 })])
303 }
304 OscPacket::Bundle(bundle) => {
305 let messages: Vec<Message> = bundle
306 .content
307 .iter()
308 .filter_map(|p| packet_to_messages(p, namespace))
309 .flatten()
310 .collect();
311
312 if messages.is_empty() {
313 None
314 } else {
315 let timestamp = bundle.timetag.seconds as u64 * 1_000_000
317 + (bundle.timetag.fractional as u64 * 1_000_000 / u32::MAX as u64);
318
319 Some(vec![Message::Bundle(clasp_core::BundleMessage {
320 timestamp: Some(timestamp),
321 messages,
322 })])
323 }
324 }
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_osc_arg_conversion() {
334 assert_eq!(osc_arg_to_value(&OscType::Int(42)), Value::Int(42));
335 assert_eq!(osc_arg_to_value(&OscType::Float(0.5)), Value::Float(0.5));
336 assert_eq!(
337 osc_arg_to_value(&OscType::String("test".to_string())),
338 Value::String("test".to_string())
339 );
340 }
341
342 #[test]
343 fn test_value_to_osc() {
344 let args = value_to_osc_args(&Value::Float(0.75));
345 assert_eq!(args.len(), 1);
346 match &args[0] {
347 OscType::Double(f) => assert!((f - 0.75).abs() < 0.001),
348 _ => panic!("Expected Double"),
349 }
350 }
351}