Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17// Protocol-specific imports
18use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
19use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
20use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
21use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress};
22
23/// Base trait for protocol-specific commands.
24#[async_trait]
25pub trait ProtocolCommand: Command {
26    /// Get the protocol type.
27    fn protocol(&self) -> Protocol;
28
29    /// Get the default port.
30    fn default_port(&self) -> u16;
31
32    /// Start the protocol server.
33    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
34
35    /// Stop the protocol server.
36    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
37}
38
39// =============================================================================
40// Modbus Command
41// =============================================================================
42
43/// Modbus protocol command.
44pub struct ModbusCommand {
45    /// Binding address.
46    bind_addr: SocketAddr,
47    /// Number of devices to simulate.
48    devices: usize,
49    /// Points per device.
50    points_per_device: usize,
51    /// Use RTU mode instead of TCP.
52    rtu_mode: bool,
53    /// Serial port for RTU mode.
54    serial_port: Option<String>,
55    /// Server instance (for shutdown).
56    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
57    /// Server task handle.
58    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
59}
60
61impl ModbusCommand {
62    pub fn new() -> Self {
63        Self {
64            bind_addr: "0.0.0.0:502".parse().unwrap(),
65            devices: 1,
66            points_per_device: 100,
67            rtu_mode: false,
68            serial_port: None,
69            server: Arc::new(Mutex::new(None)),
70            server_task: Arc::new(Mutex::new(None)),
71        }
72    }
73
74    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
75        self.bind_addr = addr;
76        self
77    }
78
79    pub fn with_port(mut self, port: u16) -> Self {
80        self.bind_addr.set_port(port);
81        self
82    }
83
84    pub fn with_devices(mut self, devices: usize) -> Self {
85        self.devices = devices;
86        self
87    }
88
89    pub fn with_points(mut self, points: usize) -> Self {
90        self.points_per_device = points;
91        self
92    }
93
94    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
95        self.rtu_mode = true;
96        self.serial_port = Some(serial_port.into());
97        self
98    }
99}
100
101impl Default for ModbusCommand {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107#[async_trait]
108impl Command for ModbusCommand {
109    fn name(&self) -> &str {
110        "modbus"
111    }
112
113    fn description(&self) -> &str {
114        "Start a Modbus TCP/RTU simulator"
115    }
116
117    fn requires_engine(&self) -> bool {
118        true
119    }
120
121    fn supports_shutdown(&self) -> bool {
122        true
123    }
124
125    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
126        {
127            let output = ctx.output();
128            if self.rtu_mode {
129                output.header("Modbus RTU Simulator");
130                output.kv(
131                    "Serial Port",
132                    self.serial_port.as_deref().unwrap_or("N/A"),
133                );
134            } else {
135                output.header("Modbus TCP Simulator");
136                output.kv("Bind Address", self.bind_addr);
137            }
138            output.kv("Devices", self.devices);
139            output.kv("Points per Device", self.points_per_device);
140            output.kv("Total Points", self.devices * self.points_per_device);
141        }
142
143        self.start_server(ctx).await?;
144
145        let colors_enabled = ctx.colors_enabled();
146        let table = TableBuilder::new(colors_enabled)
147            .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"])
148            .status_row(
149                [
150                    "1",
151                    &(self.points_per_device / 4).to_string(),
152                    &(self.points_per_device / 4).to_string(),
153                    &(self.points_per_device / 4).to_string(),
154                    &(self.points_per_device / 4).to_string(),
155                    "Online",
156                ],
157                StatusType::Success,
158            );
159        table.print();
160
161        ctx.output().info("Press Ctrl+C to stop");
162        ctx.shutdown_signal().notified().await;
163
164        self.stop_server(ctx).await?;
165        ctx.output().success("Modbus simulator stopped");
166
167        Ok(CommandOutput::quiet_success())
168    }
169}
170
171#[async_trait]
172impl ProtocolCommand for ModbusCommand {
173    fn protocol(&self) -> Protocol {
174        if self.rtu_mode {
175            Protocol::ModbusRtu
176        } else {
177            Protocol::ModbusTcp
178        }
179    }
180
181    fn default_port(&self) -> u16 {
182        502
183    }
184
185    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
186        let output = ctx.output();
187        let spinner = output.spinner("Starting Modbus server...");
188
189        let config = ServerConfigV2 {
190            bind_address: self.bind_addr,
191            ..Default::default()
192        };
193
194        let server = Arc::new(ModbusTcpServerV2::new(config));
195
196        for i in 0..self.devices {
197            let unit_id = (i + 1) as u8;
198            let points = (self.points_per_device / 4) as u16;
199            let device_config = ModbusDeviceConfig {
200                unit_id,
201                name: format!("Device-{}", unit_id),
202                holding_registers: points,
203                input_registers: points,
204                coils: points,
205                discrete_inputs: points,
206                response_delay_ms: 0,
207            };
208            let device = ModbusDevice::new(device_config);
209            server.add_device(device);
210        }
211
212        {
213            let mut server_guard = self.server.lock().await;
214            *server_guard = Some(server.clone());
215        }
216
217        let server_clone = server.clone();
218        let task = tokio::spawn(async move {
219            if let Err(e) = server_clone.run().await {
220                tracing::error!("Modbus server error: {}", e);
221            }
222        });
223
224        {
225            let mut task_guard = self.server_task.lock().await;
226            *task_guard = Some(task);
227        }
228
229        tokio::time::sleep(Duration::from_millis(100)).await;
230
231        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
232        Ok(())
233    }
234
235    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
236        if let Some(server) = self.server.lock().await.as_ref() {
237            server.shutdown();
238        }
239
240        if let Some(task) = self.server_task.lock().await.take() {
241            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
242        }
243
244        Ok(())
245    }
246}
247
248// =============================================================================
249// OPC UA Command
250// =============================================================================
251
252/// OPC UA protocol command.
253pub struct OpcuaCommand {
254    bind_addr: SocketAddr,
255    endpoint_path: String,
256    nodes: usize,
257    security_mode: String,
258    /// Server instance (for shutdown).
259    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
260    /// Server task handle.
261    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
262}
263
264impl OpcuaCommand {
265    pub fn new() -> Self {
266        Self {
267            bind_addr: "0.0.0.0:4840".parse().unwrap(),
268            endpoint_path: "/".into(),
269            nodes: 1000,
270            security_mode: "None".into(),
271            server: Arc::new(Mutex::new(None)),
272            server_task: Arc::new(Mutex::new(None)),
273        }
274    }
275
276    pub fn with_port(mut self, port: u16) -> Self {
277        self.bind_addr.set_port(port);
278        self
279    }
280
281    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
282        self.endpoint_path = path.into();
283        self
284    }
285
286    pub fn with_nodes(mut self, nodes: usize) -> Self {
287        self.nodes = nodes;
288        self
289    }
290
291    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
292        self.security_mode = mode.into();
293        self
294    }
295}
296
297impl Default for OpcuaCommand {
298    fn default() -> Self {
299        Self::new()
300    }
301}
302
303#[async_trait]
304impl Command for OpcuaCommand {
305    fn name(&self) -> &str {
306        "opcua"
307    }
308
309    fn description(&self) -> &str {
310        "Start an OPC UA server simulator"
311    }
312
313    fn requires_engine(&self) -> bool {
314        true
315    }
316
317    fn supports_shutdown(&self) -> bool {
318        true
319    }
320
321    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
322        {
323            let output = ctx.output();
324            output.header("OPC UA Simulator");
325            output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
326            output.kv("Nodes", self.nodes);
327            output.kv("Security Mode", &self.security_mode);
328        }
329
330        self.start_server(ctx).await?;
331
332        let colors_enabled = ctx.colors_enabled();
333        let table = TableBuilder::new(colors_enabled)
334            .header(["Namespace", "Nodes", "Subscriptions", "Status"])
335            .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
336            .status_row(
337                ["1", &self.nodes.to_string(), "0", "Online"],
338                StatusType::Success,
339            );
340        table.print();
341
342        ctx.output().info("Press Ctrl+C to stop");
343        ctx.shutdown_signal().notified().await;
344
345        self.stop_server(ctx).await?;
346        ctx.output().success("OPC UA simulator stopped");
347
348        Ok(CommandOutput::quiet_success())
349    }
350}
351
352#[async_trait]
353impl ProtocolCommand for OpcuaCommand {
354    fn protocol(&self) -> Protocol {
355        Protocol::OpcUa
356    }
357
358    fn default_port(&self) -> u16 {
359        4840
360    }
361
362    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363        let output = ctx.output();
364        let spinner = output.spinner("Starting OPC UA server...");
365
366        let config = OpcUaServerConfig {
367            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
368            server_name: "Mabinogion OPC UA Simulator".to_string(),
369            max_subscriptions: 1000,
370            max_monitored_items: 10000,
371            ..Default::default()
372        };
373
374        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
375            crate::error::CliError::ExecutionFailed {
376                message: format!("Failed to create OPC UA server: {}", e)
377            }
378        })?);
379
380        // Add sample nodes
381        for i in 0..self.nodes.min(100) {
382            let node_id = format!("ns=2;i={}", 1000 + i);
383            let _ = server.add_variable(node_id, format!("Variable_{}", i), (i as f64) * 0.1);
384        }
385
386        {
387            let mut server_guard = self.server.lock().await;
388            *server_guard = Some(server.clone());
389        }
390
391        let server_clone = server.clone();
392        let task = tokio::spawn(async move {
393            if let Err(e) = server_clone.start().await {
394                tracing::error!("OPC UA server error: {}", e);
395            }
396        });
397
398        {
399            let mut task_guard = self.server_task.lock().await;
400            *task_guard = Some(task);
401        }
402
403        tokio::time::sleep(Duration::from_millis(100)).await;
404
405        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
406        Ok(())
407    }
408
409    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
410        if let Some(server) = self.server.lock().await.as_ref() {
411            let _ = server.stop().await;
412        }
413
414        if let Some(task) = self.server_task.lock().await.take() {
415            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
416        }
417
418        Ok(())
419    }
420}
421
422// =============================================================================
423// BACnet Command
424// =============================================================================
425
426/// BACnet protocol command.
427pub struct BacnetCommand {
428    bind_addr: SocketAddr,
429    device_instance: u32,
430    objects: usize,
431    bbmd_enabled: bool,
432    /// Server instance (for shutdown).
433    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
434    /// Server task handle.
435    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
436}
437
438impl BacnetCommand {
439    pub fn new() -> Self {
440        Self {
441            bind_addr: "0.0.0.0:47808".parse().unwrap(),
442            device_instance: 1234,
443            objects: 100,
444            bbmd_enabled: false,
445            server: Arc::new(Mutex::new(None)),
446            server_task: Arc::new(Mutex::new(None)),
447        }
448    }
449
450    pub fn with_port(mut self, port: u16) -> Self {
451        self.bind_addr.set_port(port);
452        self
453    }
454
455    pub fn with_device_instance(mut self, instance: u32) -> Self {
456        self.device_instance = instance;
457        self
458    }
459
460    pub fn with_objects(mut self, objects: usize) -> Self {
461        self.objects = objects;
462        self
463    }
464
465    pub fn with_bbmd(mut self, enabled: bool) -> Self {
466        self.bbmd_enabled = enabled;
467        self
468    }
469}
470
471impl Default for BacnetCommand {
472    fn default() -> Self {
473        Self::new()
474    }
475}
476
477#[async_trait]
478impl Command for BacnetCommand {
479    fn name(&self) -> &str {
480        "bacnet"
481    }
482
483    fn description(&self) -> &str {
484        "Start a BACnet/IP simulator"
485    }
486
487    fn requires_engine(&self) -> bool {
488        true
489    }
490
491    fn supports_shutdown(&self) -> bool {
492        true
493    }
494
495    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
496        {
497            let output = ctx.output();
498            output.header("BACnet/IP Simulator");
499            output.kv("Bind Address", self.bind_addr);
500            output.kv("Device Instance", self.device_instance);
501            output.kv("Objects", self.objects);
502            output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
503        }
504
505        self.start_server(ctx).await?;
506
507        let colors_enabled = ctx.colors_enabled();
508        let table = TableBuilder::new(colors_enabled)
509            .header(["Object Type", "Count", "Status"])
510            .status_row(["Device", "1", "Online"], StatusType::Success)
511            .status_row(["Analog Input", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
512            .status_row(["Analog Output", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
513            .status_row(["Binary Input", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
514            .status_row(["Binary Output", &(self.objects / 4).to_string(), "Active"], StatusType::Success);
515        table.print();
516
517        ctx.output().info("Press Ctrl+C to stop");
518        ctx.shutdown_signal().notified().await;
519
520        self.stop_server(ctx).await?;
521        ctx.output().success("BACnet simulator stopped");
522
523        Ok(CommandOutput::quiet_success())
524    }
525}
526
527#[async_trait]
528impl ProtocolCommand for BacnetCommand {
529    fn protocol(&self) -> Protocol {
530        Protocol::BacnetIp
531    }
532
533    fn default_port(&self) -> u16 {
534        47808
535    }
536
537    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
538        let output = ctx.output();
539        let spinner = output.spinner("Starting BACnet server...");
540
541        let config = BacnetServerConfig::new(self.device_instance)
542            .with_bind_addr(self.bind_addr)
543            .with_device_name("Mabinogion BACnet Simulator");
544
545        // Create object registry with sample objects
546        let registry = ObjectRegistry::new();
547
548        let objects_per_type = self.objects / 4;
549        for i in 0..objects_per_type {
550            let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
551            registry.register(Arc::new(ai));
552        }
553        for i in 0..objects_per_type {
554            let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
555            registry.register(Arc::new(ao));
556        }
557        for i in 0..objects_per_type {
558            let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
559            registry.register(Arc::new(bi));
560        }
561        for i in 0..objects_per_type {
562            let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
563            registry.register(Arc::new(bo));
564        }
565
566        let server = Arc::new(BACnetServer::new(config, registry));
567
568        {
569            let mut server_guard = self.server.lock().await;
570            *server_guard = Some(server.clone());
571        }
572
573        let server_clone = server.clone();
574        let task = tokio::spawn(async move {
575            if let Err(e) = server_clone.run().await {
576                tracing::error!("BACnet server error: {}", e);
577            }
578        });
579
580        {
581            let mut task_guard = self.server_task.lock().await;
582            *task_guard = Some(task);
583        }
584
585        tokio::time::sleep(Duration::from_millis(100)).await;
586
587        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
588        Ok(())
589    }
590
591    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
592        if let Some(server) = self.server.lock().await.as_ref() {
593            server.shutdown();
594        }
595
596        if let Some(task) = self.server_task.lock().await.take() {
597            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
598        }
599
600        Ok(())
601    }
602}
603
604// =============================================================================
605// KNX Command
606// =============================================================================
607
608/// KNX protocol command.
609pub struct KnxCommand {
610    bind_addr: SocketAddr,
611    individual_address: String,
612    group_objects: usize,
613    /// Server instance (for shutdown).
614    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
615    /// Server task handle.
616    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
617}
618
619impl KnxCommand {
620    pub fn new() -> Self {
621        Self {
622            bind_addr: "0.0.0.0:3671".parse().unwrap(),
623            individual_address: "1.1.1".into(),
624            group_objects: 100,
625            server: Arc::new(Mutex::new(None)),
626            server_task: Arc::new(Mutex::new(None)),
627        }
628    }
629
630    pub fn with_port(mut self, port: u16) -> Self {
631        self.bind_addr.set_port(port);
632        self
633    }
634
635    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
636        self.individual_address = addr.into();
637        self
638    }
639
640    pub fn with_group_objects(mut self, count: usize) -> Self {
641        self.group_objects = count;
642        self
643    }
644}
645
646impl Default for KnxCommand {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652#[async_trait]
653impl Command for KnxCommand {
654    fn name(&self) -> &str {
655        "knx"
656    }
657
658    fn description(&self) -> &str {
659        "Start a KNXnet/IP simulator"
660    }
661
662    fn requires_engine(&self) -> bool {
663        true
664    }
665
666    fn supports_shutdown(&self) -> bool {
667        true
668    }
669
670    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671        {
672            let output = ctx.output();
673            output.header("KNXnet/IP Simulator");
674            output.kv("Bind Address", self.bind_addr);
675            output.kv("Individual Address", &self.individual_address);
676            output.kv("Group Objects", self.group_objects);
677        }
678
679        self.start_server(ctx).await?;
680
681        let colors_enabled = ctx.colors_enabled();
682        let table = TableBuilder::new(colors_enabled)
683            .header(["Service", "Status"])
684            .status_row(["Core", "Ready"], StatusType::Success)
685            .status_row(["Device Management", "Ready"], StatusType::Success)
686            .status_row(["Tunneling", "Ready"], StatusType::Success);
687        table.print();
688
689        ctx.output().info("Press Ctrl+C to stop");
690        ctx.shutdown_signal().notified().await;
691
692        self.stop_server(ctx).await?;
693        ctx.output().success("KNX simulator stopped");
694
695        Ok(CommandOutput::quiet_success())
696    }
697}
698
699#[async_trait]
700impl ProtocolCommand for KnxCommand {
701    fn protocol(&self) -> Protocol {
702        Protocol::KnxIp
703    }
704
705    fn default_port(&self) -> u16 {
706        3671
707    }
708
709    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
710        let output = ctx.output();
711        let spinner = output.spinner("Starting KNX server...");
712
713        // Parse individual address
714        let individual_address: IndividualAddress = self.individual_address.parse()
715            .map_err(|_| crate::error::CliError::ExecutionFailed {
716                message: format!("Invalid individual address: {}", self.individual_address)
717            })?;
718
719        let config = KnxServerConfig {
720            bind_addr: self.bind_addr,
721            individual_address,
722            max_connections: 10,
723            ..Default::default()
724        };
725
726        let server = Arc::new(KnxServer::new(config));
727
728        {
729            let mut server_guard = self.server.lock().await;
730            *server_guard = Some(server.clone());
731        }
732
733        let server_clone = server.clone();
734        let task = tokio::spawn(async move {
735            if let Err(e) = server_clone.start().await {
736                tracing::error!("KNX server error: {}", e);
737            }
738        });
739
740        {
741            let mut task_guard = self.server_task.lock().await;
742            *task_guard = Some(task);
743        }
744
745        tokio::time::sleep(Duration::from_millis(100)).await;
746
747        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
748        Ok(())
749    }
750
751    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
752        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
753        let server_opt = self.server.lock().await.take();
754        if let Some(server) = server_opt {
755            // Use spawn_blocking to handle the non-Send future
756            let _ = tokio::task::spawn_blocking(move || {
757                let rt = tokio::runtime::Handle::current();
758                rt.block_on(async {
759                    let _ = server.stop().await;
760                })
761            }).await;
762        }
763
764        if let Some(task) = self.server_task.lock().await.take() {
765            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
766        }
767
768        Ok(())
769    }
770}