1use async_trait::async_trait;
6use clasp_core::Message;
7use parking_lot::Mutex;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tracing::{debug, error, info, warn};
11
12use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16pub enum DmxInterfaceType {
17 EnttecPro,
19 EnttecOpen,
21 Ftdi,
23 #[default]
25 Virtual,
26}
27
28#[derive(Debug, Clone)]
30pub struct DmxBridgeConfig {
31 pub port: Option<String>,
33 pub interface_type: DmxInterfaceType,
35 pub universe: u16,
37 pub namespace: String,
39 pub refresh_rate: f64,
41}
42
43impl Default for DmxBridgeConfig {
44 fn default() -> Self {
45 Self {
46 port: None,
47 interface_type: DmxInterfaceType::Virtual,
48 universe: 0,
49 namespace: "/dmx".to_string(),
50 refresh_rate: 44.0, }
52 }
53}
54
55struct DmxSender {
57 tx: std::sync::mpsc::Sender<DmxCommand>,
58}
59
60#[allow(dead_code)]
61enum DmxCommand {
62 SetChannel(u16, u8),
63 SetFrame(Box<[u8; 512]>),
64 Stop,
65}
66
67pub struct DmxBridge {
69 config: BridgeConfig,
70 dmx_config: DmxBridgeConfig,
71 running: Arc<Mutex<bool>>,
72 tx: Option<mpsc::Sender<BridgeEvent>>,
73 dmx_sender: Option<DmxSender>,
74 dmx_state: Arc<Mutex<[u8; 512]>>,
76 _output_thread: Option<std::thread::JoinHandle<()>>,
78}
79
80impl DmxBridge {
81 pub fn new(dmx_config: DmxBridgeConfig) -> Self {
82 let config = BridgeConfig {
83 name: format!("DMX Bridge (Universe {})", dmx_config.universe),
84 protocol: "dmx".to_string(),
85 bidirectional: false, ..Default::default()
87 };
88
89 Self {
90 config,
91 dmx_config,
92 running: Arc::new(Mutex::new(false)),
93 tx: None,
94 dmx_sender: None,
95 dmx_state: Arc::new(Mutex::new([0u8; 512])),
96 _output_thread: None,
97 }
98 }
99
100 pub fn list_ports() -> Result<Vec<String>> {
102 #[cfg(target_os = "macos")]
104 {
105 let ports: Vec<String> = std::fs::read_dir("/dev")
106 .map(|entries| {
107 entries
108 .filter_map(|e| e.ok())
109 .map(|e| e.path().to_string_lossy().to_string())
110 .filter(|p| p.contains("tty.usbserial") || p.contains("cu.usbserial"))
111 .collect()
112 })
113 .unwrap_or_default();
114 Ok(ports)
115 }
116
117 #[cfg(target_os = "linux")]
118 {
119 let ports: Vec<String> = std::fs::read_dir("/dev")
120 .map(|entries| {
121 entries
122 .filter_map(|e| e.ok())
123 .map(|e| e.path().to_string_lossy().to_string())
124 .filter(|p| p.contains("ttyUSB") || p.contains("ttyACM"))
125 .collect()
126 })
127 .unwrap_or_default();
128 Ok(ports)
129 }
130
131 #[cfg(target_os = "windows")]
132 {
133 Ok(vec![
135 "COM1".to_string(),
136 "COM2".to_string(),
137 "COM3".to_string(),
138 ])
139 }
140
141 #[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
142 {
143 Ok(vec![])
144 }
145 }
146
147 pub fn set_channel(&self, channel: u16, value: u8) {
149 if channel > 0 && channel <= 512 {
150 let mut state = self.dmx_state.lock();
151 state[(channel - 1) as usize] = value;
152
153 if let Some(sender) = &self.dmx_sender {
154 let _ = sender.tx.send(DmxCommand::SetChannel(channel, value));
155 }
156 }
157 }
158
159 pub fn set_frame(&self, data: &[u8; 512]) {
161 *self.dmx_state.lock() = *data;
162
163 if let Some(sender) = &self.dmx_sender {
164 let _ = sender.tx.send(DmxCommand::SetFrame(Box::new(*data)));
165 }
166 }
167
168 pub fn get_channel(&self, channel: u16) -> Option<u8> {
170 if channel > 0 && channel <= 512 {
171 Some(self.dmx_state.lock()[(channel - 1) as usize])
172 } else {
173 None
174 }
175 }
176}
177
178#[async_trait]
179impl Bridge for DmxBridge {
180 fn config(&self) -> &BridgeConfig {
181 &self.config
182 }
183
184 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
185 if *self.running.lock() {
186 return Err(BridgeError::Other("Bridge already running".to_string()));
187 }
188
189 let (tx, rx) = mpsc::channel(100);
190 self.tx = Some(tx.clone());
191
192 let (dmx_tx, dmx_rx) = std::sync::mpsc::channel::<DmxCommand>();
193 self.dmx_sender = Some(DmxSender { tx: dmx_tx });
194
195 let port_path = self.dmx_config.port.clone();
196 let interface_type = self.dmx_config.interface_type;
197 let refresh_rate = self.dmx_config.refresh_rate;
198 let running = self.running.clone();
199 let dmx_state = self.dmx_state.clone();
200
201 let output_thread = std::thread::spawn(move || {
203 let refresh_interval = std::time::Duration::from_secs_f64(1.0 / refresh_rate);
204
205 match interface_type {
206 DmxInterfaceType::Virtual => {
207 info!("DMX bridge started in virtual mode");
208
209 while *running.lock() {
211 match dmx_rx.recv_timeout(refresh_interval) {
212 Ok(DmxCommand::SetChannel(ch, val)) => {
213 debug!("Virtual DMX: Channel {} = {}", ch, val);
214 }
215 Ok(DmxCommand::SetFrame(_)) => {
216 debug!("Virtual DMX: Frame updated");
217 }
218 Ok(DmxCommand::Stop) => break,
219 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
220 let _frame = *dmx_state.lock();
222 }
224 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
225 }
226 }
227 }
228 DmxInterfaceType::EnttecPro => {
229 if let Some(port) = port_path {
230 info!("Opening ENTTEC DMX USB Pro on {}", port);
231 warn!("ENTTEC Pro not yet implemented, using virtual mode");
239 } else {
240 error!("No port specified for ENTTEC Pro");
241 }
242
243 while *running.lock() {
245 match dmx_rx.recv_timeout(refresh_interval) {
246 Ok(DmxCommand::Stop) => break,
247 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
248 _ => {}
249 }
250 }
251 }
252 DmxInterfaceType::EnttecOpen | DmxInterfaceType::Ftdi => {
253 if let Some(port) = port_path {
254 info!("Opening DMX interface on {}", port);
255 warn!("FTDI DMX not yet implemented, using virtual mode");
261 } else {
262 error!("No port specified for DMX interface");
263 }
264
265 while *running.lock() {
266 match dmx_rx.recv_timeout(refresh_interval) {
267 Ok(DmxCommand::Stop) => break,
268 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
269 _ => {}
270 }
271 }
272 }
273 }
274
275 info!("DMX output thread stopped");
276 });
277
278 self._output_thread = Some(output_thread);
279 *self.running.lock() = true;
280
281 let _ = tx.send(BridgeEvent::Connected).await;
282 Ok(rx)
283 }
284
285 async fn stop(&mut self) -> Result<()> {
286 *self.running.lock() = false;
287
288 if let Some(sender) = &self.dmx_sender {
289 let _ = sender.tx.send(DmxCommand::Stop);
290 }
291
292 self.tx = None;
293 self.dmx_sender = None;
294 self._output_thread = None;
295 info!("DMX bridge stopped");
296 Ok(())
297 }
298
299 async fn send(&self, message: Message) -> Result<()> {
300 if let Message::Set(set) = &message {
301 let parts: Vec<&str> = set.address.split('/').collect();
303
304 if parts.len() >= 4 {
305 let universe: u16 = parts[2]
306 .parse()
307 .map_err(|_| BridgeError::Mapping("Invalid universe".to_string()))?;
308
309 if universe != self.dmx_config.universe {
311 return Ok(());
312 }
313
314 let channel: u16 = parts[3]
315 .parse()
316 .map_err(|_| BridgeError::Mapping("Invalid channel".to_string()))?;
317
318 if channel > 0 && channel <= 512 {
319 let value = set.value.as_i64().unwrap_or(0).clamp(0, 255) as u8;
320 self.set_channel(channel, value);
321 }
322 }
323 }
324
325 Ok(())
326 }
327
328 fn is_running(&self) -> bool {
329 *self.running.lock()
330 }
331
332 fn namespace(&self) -> &str {
333 &self.dmx_config.namespace
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_config_default() {
343 let config = DmxBridgeConfig::default();
344 assert_eq!(config.namespace, "/dmx");
345 assert_eq!(config.universe, 0);
346 assert_eq!(config.interface_type, DmxInterfaceType::Virtual);
347 }
348
349 #[test]
350 fn test_channel_operations() {
351 let bridge = DmxBridge::new(DmxBridgeConfig::default());
352
353 {
355 let mut state = bridge.dmx_state.lock();
356 state[0] = 127;
357 }
358 assert_eq!(bridge.get_channel(1), Some(127));
359
360 assert_eq!(bridge.get_channel(0), None);
362 assert_eq!(bridge.get_channel(513), None);
363 }
364}