1use async_trait::async_trait;
6use clasp_core::{Message, SetMessage, Value};
7use parking_lot::Mutex;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tracing::{debug, error, info, warn};
11
12use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum DmxInterfaceType {
17 EnttecPro,
19 EnttecOpen,
21 Ftdi,
23 Virtual,
25}
26
27impl Default for DmxInterfaceType {
28 fn default() -> Self {
29 Self::Virtual
30 }
31}
32
33#[derive(Debug, Clone)]
35pub struct DmxBridgeConfig {
36 pub port: Option<String>,
38 pub interface_type: DmxInterfaceType,
40 pub universe: u16,
42 pub namespace: String,
44 pub refresh_rate: f64,
46}
47
48impl Default for DmxBridgeConfig {
49 fn default() -> Self {
50 Self {
51 port: None,
52 interface_type: DmxInterfaceType::Virtual,
53 universe: 0,
54 namespace: "/dmx".to_string(),
55 refresh_rate: 44.0, }
57 }
58}
59
60struct DmxSender {
62 tx: std::sync::mpsc::Sender<DmxCommand>,
63}
64
65enum DmxCommand {
66 SetChannel(u16, u8),
67 SetFrame([u8; 512]),
68 Stop,
69}
70
71pub struct DmxBridge {
73 config: BridgeConfig,
74 dmx_config: DmxBridgeConfig,
75 running: Arc<Mutex<bool>>,
76 tx: Option<mpsc::Sender<BridgeEvent>>,
77 dmx_sender: Option<DmxSender>,
78 dmx_state: Arc<Mutex<[u8; 512]>>,
80 _output_thread: Option<std::thread::JoinHandle<()>>,
82}
83
84impl DmxBridge {
85 pub fn new(dmx_config: DmxBridgeConfig) -> Self {
86 let config = BridgeConfig {
87 name: format!("DMX Bridge (Universe {})", dmx_config.universe),
88 protocol: "dmx".to_string(),
89 bidirectional: false, ..Default::default()
91 };
92
93 Self {
94 config,
95 dmx_config,
96 running: Arc::new(Mutex::new(false)),
97 tx: None,
98 dmx_sender: None,
99 dmx_state: Arc::new(Mutex::new([0u8; 512])),
100 _output_thread: None,
101 }
102 }
103
104 pub fn list_ports() -> Result<Vec<String>> {
106 #[cfg(target_os = "macos")]
108 {
109 let ports: Vec<String> = std::fs::read_dir("/dev")
110 .map(|entries| {
111 entries
112 .filter_map(|e| e.ok())
113 .map(|e| e.path().to_string_lossy().to_string())
114 .filter(|p| p.contains("tty.usbserial") || p.contains("cu.usbserial"))
115 .collect()
116 })
117 .unwrap_or_default();
118 Ok(ports)
119 }
120
121 #[cfg(target_os = "linux")]
122 {
123 let ports: Vec<String> = std::fs::read_dir("/dev")
124 .map(|entries| {
125 entries
126 .filter_map(|e| e.ok())
127 .map(|e| e.path().to_string_lossy().to_string())
128 .filter(|p| p.contains("ttyUSB") || p.contains("ttyACM"))
129 .collect()
130 })
131 .unwrap_or_default();
132 Ok(ports)
133 }
134
135 #[cfg(target_os = "windows")]
136 {
137 Ok(vec![
139 "COM1".to_string(),
140 "COM2".to_string(),
141 "COM3".to_string(),
142 ])
143 }
144
145 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
146 {
147 Ok(vec![])
148 }
149 }
150
151 pub fn set_channel(&self, channel: u16, value: u8) {
153 if channel > 0 && channel <= 512 {
154 let mut state = self.dmx_state.lock();
155 state[(channel - 1) as usize] = value;
156
157 if let Some(sender) = &self.dmx_sender {
158 let _ = sender.tx.send(DmxCommand::SetChannel(channel, value));
159 }
160 }
161 }
162
163 pub fn set_frame(&self, data: &[u8; 512]) {
165 *self.dmx_state.lock() = *data;
166
167 if let Some(sender) = &self.dmx_sender {
168 let _ = sender.tx.send(DmxCommand::SetFrame(*data));
169 }
170 }
171
172 pub fn get_channel(&self, channel: u16) -> Option<u8> {
174 if channel > 0 && channel <= 512 {
175 Some(self.dmx_state.lock()[(channel - 1) as usize])
176 } else {
177 None
178 }
179 }
180}
181
182#[async_trait]
183impl Bridge for DmxBridge {
184 fn config(&self) -> &BridgeConfig {
185 &self.config
186 }
187
188 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
189 if *self.running.lock() {
190 return Err(BridgeError::Other("Bridge already running".to_string()));
191 }
192
193 let (tx, rx) = mpsc::channel(100);
194 self.tx = Some(tx.clone());
195
196 let (dmx_tx, dmx_rx) = std::sync::mpsc::channel::<DmxCommand>();
197 self.dmx_sender = Some(DmxSender { tx: dmx_tx });
198
199 let port_path = self.dmx_config.port.clone();
200 let interface_type = self.dmx_config.interface_type;
201 let refresh_rate = self.dmx_config.refresh_rate;
202 let running = self.running.clone();
203 let dmx_state = self.dmx_state.clone();
204
205 let output_thread = std::thread::spawn(move || {
207 let refresh_interval = std::time::Duration::from_secs_f64(1.0 / refresh_rate);
208
209 match interface_type {
210 DmxInterfaceType::Virtual => {
211 info!("DMX bridge started in virtual mode");
212
213 while *running.lock() {
215 match dmx_rx.recv_timeout(refresh_interval) {
216 Ok(DmxCommand::SetChannel(ch, val)) => {
217 debug!("Virtual DMX: Channel {} = {}", ch, val);
218 }
219 Ok(DmxCommand::SetFrame(_)) => {
220 debug!("Virtual DMX: Frame updated");
221 }
222 Ok(DmxCommand::Stop) => break,
223 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
224 let _frame = *dmx_state.lock();
226 }
228 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
229 }
230 }
231 }
232 DmxInterfaceType::EnttecPro => {
233 if let Some(port) = port_path {
234 info!("Opening ENTTEC DMX USB Pro on {}", port);
235 warn!("ENTTEC Pro not yet implemented, using virtual mode");
243 } else {
244 error!("No port specified for ENTTEC Pro");
245 }
246
247 while *running.lock() {
249 match dmx_rx.recv_timeout(refresh_interval) {
250 Ok(DmxCommand::Stop) => break,
251 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
252 _ => {}
253 }
254 }
255 }
256 DmxInterfaceType::EnttecOpen | DmxInterfaceType::Ftdi => {
257 if let Some(port) = port_path {
258 info!("Opening DMX interface on {}", port);
259 warn!("FTDI DMX not yet implemented, using virtual mode");
265 } else {
266 error!("No port specified for DMX interface");
267 }
268
269 while *running.lock() {
270 match dmx_rx.recv_timeout(refresh_interval) {
271 Ok(DmxCommand::Stop) => break,
272 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
273 _ => {}
274 }
275 }
276 }
277 }
278
279 info!("DMX output thread stopped");
280 });
281
282 self._output_thread = Some(output_thread);
283 *self.running.lock() = true;
284
285 let _ = tx.send(BridgeEvent::Connected).await;
286 Ok(rx)
287 }
288
289 async fn stop(&mut self) -> Result<()> {
290 *self.running.lock() = false;
291
292 if let Some(sender) = &self.dmx_sender {
293 let _ = sender.tx.send(DmxCommand::Stop);
294 }
295
296 self.tx = None;
297 self.dmx_sender = None;
298 self._output_thread = None;
299 info!("DMX bridge stopped");
300 Ok(())
301 }
302
303 async fn send(&self, message: Message) -> Result<()> {
304 match &message {
305 Message::Set(set) => {
306 let parts: Vec<&str> = set.address.split('/').collect();
308
309 if parts.len() >= 4 {
310 let universe: u16 = parts[2]
311 .parse()
312 .map_err(|_| BridgeError::Mapping("Invalid universe".to_string()))?;
313
314 if universe != self.dmx_config.universe {
316 return Ok(());
317 }
318
319 let channel: u16 = parts[3]
320 .parse()
321 .map_err(|_| BridgeError::Mapping("Invalid channel".to_string()))?;
322
323 if channel > 0 && channel <= 512 {
324 let value = set.value.as_i64().unwrap_or(0).clamp(0, 255) as u8;
325 self.set_channel(channel, value);
326 }
327 }
328 }
329 _ => {}
330 }
331
332 Ok(())
333 }
334
335 fn is_running(&self) -> bool {
336 *self.running.lock()
337 }
338
339 fn namespace(&self) -> &str {
340 &self.dmx_config.namespace
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[test]
349 fn test_config_default() {
350 let config = DmxBridgeConfig::default();
351 assert_eq!(config.namespace, "/dmx");
352 assert_eq!(config.universe, 0);
353 assert_eq!(config.interface_type, DmxInterfaceType::Virtual);
354 }
355
356 #[test]
357 fn test_channel_operations() {
358 let bridge = DmxBridge::new(DmxBridgeConfig::default());
359
360 {
362 let mut state = bridge.dmx_state.lock();
363 state[0] = 127;
364 }
365 assert_eq!(bridge.get_channel(1), Some(127));
366
367 assert_eq!(bridge.get_channel(0), None);
369 assert_eq!(bridge.get_channel(513), None);
370 }
371}