Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use serde::Serialize;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18// Protocol-specific imports
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
20use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
21use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
22use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
23
24/// Base trait for protocol-specific commands.
25#[async_trait]
26pub trait ProtocolCommand: Command {
27    /// Get the protocol type.
28    fn protocol(&self) -> Protocol;
29
30    /// Get the default port.
31    fn default_port(&self) -> u16;
32
33    /// Start the protocol server.
34    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
35
36    /// Stop the protocol server.
37    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
38}
39
40// =============================================================================
41// Modbus Command
42// =============================================================================
43
44/// Modbus protocol command.
45pub struct ModbusCommand {
46    /// Binding address.
47    bind_addr: SocketAddr,
48    /// Number of devices to simulate.
49    devices: usize,
50    /// Points per device.
51    points_per_device: usize,
52    /// Use RTU mode instead of TCP.
53    rtu_mode: bool,
54    /// Serial port for RTU mode.
55    serial_port: Option<String>,
56    /// Server instance (for shutdown).
57    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
58    /// Server task handle.
59    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
60}
61
62impl ModbusCommand {
63    pub fn new() -> Self {
64        Self {
65            bind_addr: "0.0.0.0:502".parse().unwrap(),
66            devices: 1,
67            points_per_device: 100,
68            rtu_mode: false,
69            serial_port: None,
70            server: Arc::new(Mutex::new(None)),
71            server_task: Arc::new(Mutex::new(None)),
72        }
73    }
74
75    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
76        self.bind_addr = addr;
77        self
78    }
79
80    pub fn with_port(mut self, port: u16) -> Self {
81        self.bind_addr.set_port(port);
82        self
83    }
84
85    pub fn with_devices(mut self, devices: usize) -> Self {
86        self.devices = devices;
87        self
88    }
89
90    pub fn with_points(mut self, points: usize) -> Self {
91        self.points_per_device = points;
92        self
93    }
94
95    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
96        self.rtu_mode = true;
97        self.serial_port = Some(serial_port.into());
98        self
99    }
100}
101
102impl Default for ModbusCommand {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108#[async_trait]
109impl Command for ModbusCommand {
110    fn name(&self) -> &str {
111        "modbus"
112    }
113
114    fn description(&self) -> &str {
115        "Start a Modbus TCP/RTU simulator"
116    }
117
118    fn requires_engine(&self) -> bool {
119        true
120    }
121
122    fn supports_shutdown(&self) -> bool {
123        true
124    }
125
126    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
127        let format = ctx.output().format();
128        let is_quiet = ctx.is_quiet();
129        let is_verbose = ctx.is_verbose();
130        let is_debug = ctx.is_debug();
131
132        if !is_quiet {
133            if matches!(format, OutputFormat::Table) {
134                let output = ctx.output();
135                if self.rtu_mode {
136                    output.header("Modbus RTU Simulator");
137                    output.kv(
138                        "Serial Port",
139                        self.serial_port.as_deref().unwrap_or("N/A"),
140                    );
141                } else {
142                    output.header("Modbus TCP Simulator");
143                    output.kv("Bind Address", self.bind_addr);
144                }
145                output.kv("Devices", self.devices);
146                output.kv("Points per Device", self.points_per_device);
147                output.kv("Total Points", self.devices * self.points_per_device);
148            }
149        }
150
151        // Verbose: show extra configuration details
152        if is_verbose {
153            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
154            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
155        }
156
157        // Debug: dump full configuration
158        if is_debug {
159            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
160            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
161            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
162        }
163
164        self.start_server(ctx).await?;
165
166        let points_per_type = self.points_per_device / 4;
167
168        if !is_quiet {
169            match format {
170                OutputFormat::Table => {
171                    let colors_enabled = ctx.colors_enabled();
172                    let builder = TableBuilder::new(colors_enabled)
173                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
174
175                    let devices = self.devices;
176                    let pts = points_per_type.to_string();
177                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
178                        let unit_id = (i + 1).to_string();
179                        (
180                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
181                            StatusType::Success,
182                        )
183                    });
184                    table.print();
185                }
186                _ => {
187                    #[derive(Serialize)]
188                    struct ModbusServerInfo {
189                        protocol: String,
190                        bind_address: String,
191                        devices: usize,
192                        points_per_device: usize,
193                        total_points: usize,
194                        rtu_mode: bool,
195                        serial_port: Option<String>,
196                        device_list: Vec<ModbusDeviceInfo>,
197                        status: String,
198                    }
199                    #[derive(Serialize)]
200                    struct ModbusDeviceInfo {
201                        unit_id: usize,
202                        holding_registers: usize,
203                        input_registers: usize,
204                        coils: usize,
205                        discrete_inputs: usize,
206                        status: String,
207                    }
208                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
209                        .map(|i| ModbusDeviceInfo {
210                            unit_id: i + 1,
211                            holding_registers: points_per_type,
212                            input_registers: points_per_type,
213                            coils: points_per_type,
214                            discrete_inputs: points_per_type,
215                            status: "Online".into(),
216                        })
217                        .collect();
218                    let info = ModbusServerInfo {
219                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
220                        bind_address: self.bind_addr.to_string(),
221                        devices: self.devices,
222                        points_per_device: self.points_per_device,
223                        total_points: self.devices * self.points_per_device,
224                        rtu_mode: self.rtu_mode,
225                        serial_port: self.serial_port.clone(),
226                        device_list,
227                        status: "Online".into(),
228                    };
229                    let _ = ctx.output().write(&info);
230                }
231            }
232        }
233
234        if !is_quiet {
235            ctx.output().info("Press Ctrl+C to stop");
236        }
237        ctx.shutdown_signal().notified().await;
238
239        self.stop_server(ctx).await?;
240        if !is_quiet {
241            ctx.output().success("Modbus simulator stopped");
242        }
243
244        Ok(CommandOutput::quiet_success())
245    }
246}
247
248#[async_trait]
249impl ProtocolCommand for ModbusCommand {
250    fn protocol(&self) -> Protocol {
251        if self.rtu_mode {
252            Protocol::ModbusRtu
253        } else {
254            Protocol::ModbusTcp
255        }
256    }
257
258    fn default_port(&self) -> u16 {
259        502
260    }
261
262    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
263        let output = ctx.output();
264        let spinner = output.spinner("Starting Modbus server...");
265
266        let config = ServerConfigV2 {
267            bind_address: self.bind_addr,
268            ..Default::default()
269        };
270
271        let server = Arc::new(ModbusTcpServerV2::new(config));
272
273        for i in 0..self.devices {
274            let unit_id = (i + 1) as u8;
275            let points = (self.points_per_device / 4) as u16;
276            let device_config = ModbusDeviceConfig {
277                unit_id,
278                name: format!("Device-{}", unit_id),
279                holding_registers: points,
280                input_registers: points,
281                coils: points,
282                discrete_inputs: points,
283                response_delay_ms: 0,
284            };
285            let device = ModbusDevice::new(device_config);
286            server.add_device(device);
287        }
288
289        {
290            let mut server_guard = self.server.lock().await;
291            *server_guard = Some(server.clone());
292        }
293
294        let server_clone = server.clone();
295        let task = tokio::spawn(async move {
296            if let Err(e) = server_clone.run().await {
297                tracing::error!("Modbus server error: {}", e);
298            }
299        });
300
301        {
302            let mut task_guard = self.server_task.lock().await;
303            *task_guard = Some(task);
304        }
305
306        tokio::time::sleep(Duration::from_millis(100)).await;
307
308        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
309        Ok(())
310    }
311
312    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
313        if let Some(server) = self.server.lock().await.as_ref() {
314            server.shutdown();
315        }
316
317        if let Some(task) = self.server_task.lock().await.take() {
318            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
319        }
320
321        Ok(())
322    }
323}
324
325// =============================================================================
326// OPC UA Command
327// =============================================================================
328
329/// OPC UA protocol command.
330pub struct OpcuaCommand {
331    bind_addr: SocketAddr,
332    endpoint_path: String,
333    nodes: usize,
334    security_mode: String,
335    /// Server instance (for shutdown).
336    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
337    /// Server task handle.
338    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
339}
340
341impl OpcuaCommand {
342    pub fn new() -> Self {
343        Self {
344            bind_addr: "0.0.0.0:4840".parse().unwrap(),
345            endpoint_path: "/".into(),
346            nodes: 1000,
347            security_mode: "None".into(),
348            server: Arc::new(Mutex::new(None)),
349            server_task: Arc::new(Mutex::new(None)),
350        }
351    }
352
353    pub fn with_port(mut self, port: u16) -> Self {
354        self.bind_addr.set_port(port);
355        self
356    }
357
358    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
359        self.endpoint_path = path.into();
360        self
361    }
362
363    pub fn with_nodes(mut self, nodes: usize) -> Self {
364        self.nodes = nodes;
365        self
366    }
367
368    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
369        self.security_mode = mode.into();
370        self
371    }
372}
373
374impl Default for OpcuaCommand {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380#[async_trait]
381impl Command for OpcuaCommand {
382    fn name(&self) -> &str {
383        "opcua"
384    }
385
386    fn description(&self) -> &str {
387        "Start an OPC UA server simulator"
388    }
389
390    fn requires_engine(&self) -> bool {
391        true
392    }
393
394    fn supports_shutdown(&self) -> bool {
395        true
396    }
397
398    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
399        let format = ctx.output().format();
400        let is_quiet = ctx.is_quiet();
401        let is_verbose = ctx.is_verbose();
402        let is_debug = ctx.is_debug();
403
404        if !is_quiet {
405            if matches!(format, OutputFormat::Table) {
406                let output = ctx.output();
407                output.header("OPC UA Simulator");
408                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
409                output.kv("Nodes", self.nodes);
410                output.kv("Security Mode", &self.security_mode);
411            }
412        }
413
414        // Verbose: show extra details
415        if is_verbose {
416            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
417            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
418            ctx.vprintln(format!("  Max Subscriptions: 1000"));
419            ctx.vprintln(format!("  Max Monitored Items: 10000"));
420        }
421
422        // Debug: dump full configuration
423        if is_debug {
424            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
425            ctx.dprintln(format!("Node count: {}", self.nodes));
426            ctx.dprintln(format!("Security mode: {}", self.security_mode));
427            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
428        }
429
430        self.start_server(ctx).await?;
431
432        if !is_quiet {
433            match format {
434                OutputFormat::Table => {
435                    let colors_enabled = ctx.colors_enabled();
436                    let table = TableBuilder::new(colors_enabled)
437                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
438                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
439                        .status_row(
440                            ["1", &self.nodes.to_string(), "0", "Online"],
441                            StatusType::Success,
442                        );
443                    table.print();
444                }
445                _ => {
446                    #[derive(Serialize)]
447                    struct OpcuaServerInfo {
448                        protocol: String,
449                        endpoint: String,
450                        nodes: usize,
451                        security_mode: String,
452                        namespaces: Vec<NamespaceInfo>,
453                        status: String,
454                    }
455                    #[derive(Serialize)]
456                    struct NamespaceInfo {
457                        index: u32,
458                        nodes: String,
459                        subscriptions: u32,
460                        status: String,
461                    }
462                    let info = OpcuaServerInfo {
463                        protocol: "OPC UA".into(),
464                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
465                        nodes: self.nodes,
466                        security_mode: self.security_mode.clone(),
467                        namespaces: vec![
468                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
469                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
470                        ],
471                        status: "Online".into(),
472                    };
473                    let _ = ctx.output().write(&info);
474                }
475            }
476        }
477
478        if !is_quiet {
479            ctx.output().info("Press Ctrl+C to stop");
480        }
481        ctx.shutdown_signal().notified().await;
482
483        self.stop_server(ctx).await?;
484        if !is_quiet {
485            ctx.output().success("OPC UA simulator stopped");
486        }
487
488        Ok(CommandOutput::quiet_success())
489    }
490}
491
492#[async_trait]
493impl ProtocolCommand for OpcuaCommand {
494    fn protocol(&self) -> Protocol {
495        Protocol::OpcUa
496    }
497
498    fn default_port(&self) -> u16 {
499        4840
500    }
501
502    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
503        let output = ctx.output();
504        let spinner = output.spinner("Starting OPC UA server...");
505
506        let config = OpcUaServerConfig {
507            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
508            server_name: "Mabinogion OPC UA Simulator".to_string(),
509            max_subscriptions: 1000,
510            max_monitored_items: 10000,
511            ..Default::default()
512        };
513
514        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
515            crate::error::CliError::ExecutionFailed {
516                message: format!("Failed to create OPC UA server: {}", e)
517            }
518        })?);
519
520        // Add sample nodes with diverse types and mixed read-only / writable access.
521        // Even-indexed nodes are writable, odd-indexed are read-only.
522        let node_count = self.nodes.min(100);
523        for i in 0..node_count {
524            let node_id = format!("ns=2;i={}", 1000 + i);
525            let name = format!("Variable_{}", i);
526            let value = (i as f64) * 0.1;
527
528            if i % 2 == 0 {
529                let _ = server.add_writable_variable(node_id, name, value);
530            } else {
531                let _ = server.add_variable(node_id, name, value);
532            }
533        }
534
535        {
536            let mut server_guard = self.server.lock().await;
537            *server_guard = Some(server.clone());
538        }
539
540        let server_clone = server.clone();
541        let task = tokio::spawn(async move {
542            if let Err(e) = server_clone.start().await {
543                tracing::error!("OPC UA server error: {}", e);
544            }
545        });
546
547        {
548            let mut task_guard = self.server_task.lock().await;
549            *task_guard = Some(task);
550        }
551
552        tokio::time::sleep(Duration::from_millis(100)).await;
553
554        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
555        Ok(())
556    }
557
558    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
559        if let Some(server) = self.server.lock().await.as_ref() {
560            let _ = server.stop().await;
561        }
562
563        if let Some(task) = self.server_task.lock().await.take() {
564            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
565        }
566
567        Ok(())
568    }
569}
570
571// =============================================================================
572// BACnet Command
573// =============================================================================
574
575/// BACnet protocol command.
576pub struct BacnetCommand {
577    bind_addr: SocketAddr,
578    device_instance: u32,
579    objects: usize,
580    bbmd_enabled: bool,
581    /// Server instance (for shutdown).
582    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
583    /// Server task handle.
584    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
585}
586
587impl BacnetCommand {
588    pub fn new() -> Self {
589        Self {
590            bind_addr: "0.0.0.0:47808".parse().unwrap(),
591            device_instance: 1234,
592            objects: 100,
593            bbmd_enabled: false,
594            server: Arc::new(Mutex::new(None)),
595            server_task: Arc::new(Mutex::new(None)),
596        }
597    }
598
599    pub fn with_port(mut self, port: u16) -> Self {
600        self.bind_addr.set_port(port);
601        self
602    }
603
604    pub fn with_device_instance(mut self, instance: u32) -> Self {
605        self.device_instance = instance;
606        self
607    }
608
609    pub fn with_objects(mut self, objects: usize) -> Self {
610        self.objects = objects;
611        self
612    }
613
614    pub fn with_bbmd(mut self, enabled: bool) -> Self {
615        self.bbmd_enabled = enabled;
616        self
617    }
618}
619
620impl Default for BacnetCommand {
621    fn default() -> Self {
622        Self::new()
623    }
624}
625
626#[async_trait]
627impl Command for BacnetCommand {
628    fn name(&self) -> &str {
629        "bacnet"
630    }
631
632    fn description(&self) -> &str {
633        "Start a BACnet/IP simulator"
634    }
635
636    fn requires_engine(&self) -> bool {
637        true
638    }
639
640    fn supports_shutdown(&self) -> bool {
641        true
642    }
643
644    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
645        let format = ctx.output().format();
646        let is_quiet = ctx.is_quiet();
647        let is_verbose = ctx.is_verbose();
648        let is_debug = ctx.is_debug();
649
650        if !is_quiet {
651            if matches!(format, OutputFormat::Table) {
652                let output = ctx.output();
653                output.header("BACnet/IP Simulator");
654                output.kv("Bind Address", self.bind_addr);
655                output.kv("Device Instance", self.device_instance);
656                output.kv("Objects", self.objects);
657                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
658            }
659        }
660
661        // Verbose: show extra details
662        if is_verbose {
663            let per_type = self.objects / 4;
664            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
665            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
666        }
667
668        // Debug: dump full configuration
669        if is_debug {
670            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
671            ctx.dprintln(format!("Device instance: {}", self.device_instance));
672            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
673        }
674
675        self.start_server(ctx).await?;
676
677        let per_type = self.objects / 4;
678
679        if !is_quiet {
680            match format {
681                OutputFormat::Table => {
682                    let colors_enabled = ctx.colors_enabled();
683                    let table = TableBuilder::new(colors_enabled)
684                        .header(["Object Type", "Count", "Status"])
685                        .status_row(["Device", "1", "Online"], StatusType::Success)
686                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
687                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
688                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
689                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
690                    table.print();
691                }
692                _ => {
693                    #[derive(Serialize)]
694                    struct BacnetServerInfo {
695                        protocol: String,
696                        bind_address: String,
697                        device_instance: u32,
698                        objects: usize,
699                        bbmd_enabled: bool,
700                        object_types: Vec<ObjectTypeInfo>,
701                        status: String,
702                    }
703                    #[derive(Serialize)]
704                    struct ObjectTypeInfo {
705                        object_type: String,
706                        count: usize,
707                        status: String,
708                    }
709                    let info = BacnetServerInfo {
710                        protocol: "BACnet/IP".into(),
711                        bind_address: self.bind_addr.to_string(),
712                        device_instance: self.device_instance,
713                        objects: self.objects,
714                        bbmd_enabled: self.bbmd_enabled,
715                        object_types: vec![
716                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
717                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
718                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
719                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
720                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
721                        ],
722                        status: "Online".into(),
723                    };
724                    let _ = ctx.output().write(&info);
725                }
726            }
727        }
728
729        if !is_quiet {
730            ctx.output().info("Press Ctrl+C to stop");
731        }
732        ctx.shutdown_signal().notified().await;
733
734        self.stop_server(ctx).await?;
735        if !is_quiet {
736            ctx.output().success("BACnet simulator stopped");
737        }
738
739        Ok(CommandOutput::quiet_success())
740    }
741}
742
743#[async_trait]
744impl ProtocolCommand for BacnetCommand {
745    fn protocol(&self) -> Protocol {
746        Protocol::BacnetIp
747    }
748
749    fn default_port(&self) -> u16 {
750        47808
751    }
752
753    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
754        let output = ctx.output();
755        let spinner = output.spinner("Starting BACnet server...");
756
757        let config = BacnetServerConfig::new(self.device_instance)
758            .with_bind_addr(self.bind_addr)
759            .with_device_name("Mabinogion BACnet Simulator");
760
761        // Create object registry with sample objects
762        let registry = ObjectRegistry::new();
763
764        let objects_per_type = self.objects / 4;
765        for i in 0..objects_per_type {
766            let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
767            registry.register(Arc::new(ai));
768        }
769        for i in 0..objects_per_type {
770            let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
771            registry.register(Arc::new(ao));
772        }
773        for i in 0..objects_per_type {
774            let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
775            registry.register(Arc::new(bi));
776        }
777        for i in 0..objects_per_type {
778            let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
779            registry.register(Arc::new(bo));
780        }
781
782        let server = Arc::new(BACnetServer::new(config, registry));
783
784        {
785            let mut server_guard = self.server.lock().await;
786            *server_guard = Some(server.clone());
787        }
788
789        let server_clone = server.clone();
790        let task = tokio::spawn(async move {
791            if let Err(e) = server_clone.run().await {
792                tracing::error!("BACnet server error: {}", e);
793            }
794        });
795
796        {
797            let mut task_guard = self.server_task.lock().await;
798            *task_guard = Some(task);
799        }
800
801        tokio::time::sleep(Duration::from_millis(100)).await;
802
803        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
804        Ok(())
805    }
806
807    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
808        if let Some(server) = self.server.lock().await.as_ref() {
809            server.shutdown();
810        }
811
812        if let Some(task) = self.server_task.lock().await.take() {
813            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
814        }
815
816        Ok(())
817    }
818}
819
820// =============================================================================
821// KNX Command
822// =============================================================================
823
824/// KNX protocol command.
825pub struct KnxCommand {
826    bind_addr: SocketAddr,
827    individual_address: String,
828    group_objects: usize,
829    /// Server instance (for shutdown).
830    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
831    /// Server task handle.
832    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
833}
834
835impl KnxCommand {
836    pub fn new() -> Self {
837        Self {
838            bind_addr: "0.0.0.0:3671".parse().unwrap(),
839            individual_address: "1.1.1".into(),
840            group_objects: 100,
841            server: Arc::new(Mutex::new(None)),
842            server_task: Arc::new(Mutex::new(None)),
843        }
844    }
845
846    pub fn with_port(mut self, port: u16) -> Self {
847        self.bind_addr.set_port(port);
848        self
849    }
850
851    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
852        self.individual_address = addr.into();
853        self
854    }
855
856    pub fn with_group_objects(mut self, count: usize) -> Self {
857        self.group_objects = count;
858        self
859    }
860}
861
862impl Default for KnxCommand {
863    fn default() -> Self {
864        Self::new()
865    }
866}
867
868#[async_trait]
869impl Command for KnxCommand {
870    fn name(&self) -> &str {
871        "knx"
872    }
873
874    fn description(&self) -> &str {
875        "Start a KNXnet/IP simulator"
876    }
877
878    fn requires_engine(&self) -> bool {
879        true
880    }
881
882    fn supports_shutdown(&self) -> bool {
883        true
884    }
885
886    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
887        let format = ctx.output().format();
888        let is_quiet = ctx.is_quiet();
889        let is_verbose = ctx.is_verbose();
890        let is_debug = ctx.is_debug();
891
892        if !is_quiet {
893            if matches!(format, OutputFormat::Table) {
894                let output = ctx.output();
895                output.header("KNXnet/IP Simulator");
896                output.kv("Bind Address", self.bind_addr);
897                output.kv("Individual Address", &self.individual_address);
898                output.kv("Group Objects", self.group_objects);
899            }
900        }
901
902        // Verbose: show extra details
903        if is_verbose {
904            ctx.vprintln(format!("  Max Connections: 10"));
905            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
906        }
907
908        // Debug: dump full configuration
909        if is_debug {
910            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
911            ctx.dprintln(format!("Individual address: {}", self.individual_address));
912            ctx.dprintln(format!("Group objects: {}", self.group_objects));
913        }
914
915        self.start_server(ctx).await?;
916
917        if !is_quiet {
918            match format {
919                OutputFormat::Table => {
920                    let colors_enabled = ctx.colors_enabled();
921                    let table = TableBuilder::new(colors_enabled)
922                        .header(["Service", "Status"])
923                        .status_row(["Core", "Ready"], StatusType::Success)
924                        .status_row(["Device Management", "Ready"], StatusType::Success)
925                        .status_row(["Tunneling", "Ready"], StatusType::Success);
926                    table.print();
927                }
928                _ => {
929                    #[derive(Serialize)]
930                    struct KnxServerInfo {
931                        protocol: String,
932                        bind_address: String,
933                        individual_address: String,
934                        group_objects: usize,
935                        services: Vec<ServiceInfo>,
936                        status: String,
937                    }
938                    #[derive(Serialize)]
939                    struct ServiceInfo {
940                        service: String,
941                        status: String,
942                    }
943                    let info = KnxServerInfo {
944                        protocol: "KNXnet/IP".into(),
945                        bind_address: self.bind_addr.to_string(),
946                        individual_address: self.individual_address.clone(),
947                        group_objects: self.group_objects,
948                        services: vec![
949                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
950                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
951                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
952                        ],
953                        status: "Online".into(),
954                    };
955                    let _ = ctx.output().write(&info);
956                }
957            }
958        }
959
960        if !is_quiet {
961            ctx.output().info("Press Ctrl+C to stop");
962        }
963        ctx.shutdown_signal().notified().await;
964
965        self.stop_server(ctx).await?;
966        if !is_quiet {
967            ctx.output().success("KNX simulator stopped");
968        }
969
970        Ok(CommandOutput::quiet_success())
971    }
972}
973
974#[async_trait]
975impl ProtocolCommand for KnxCommand {
976    fn protocol(&self) -> Protocol {
977        Protocol::KnxIp
978    }
979
980    fn default_port(&self) -> u16 {
981        3671
982    }
983
984    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
985        let output = ctx.output();
986        let spinner = output.spinner("Starting KNX server...");
987
988        // Parse individual address
989        let individual_address: IndividualAddress = self.individual_address.parse()
990            .map_err(|_| crate::error::CliError::ExecutionFailed {
991                message: format!("Invalid individual address: {}", self.individual_address)
992            })?;
993
994        let config = KnxServerConfig {
995            bind_addr: self.bind_addr,
996            individual_address,
997            max_connections: 256,
998            ..Default::default()
999        };
1000
1001        // Create group objects based on --groups parameter
1002        let group_table = Arc::new(GroupObjectTable::new());
1003        let dpt_types = [
1004            DptId::new(1, 1),   // Switch (bool)
1005            DptId::new(5, 1),   // Scaling (0-100%)
1006            DptId::new(9, 1),   // Temperature (float16)
1007            DptId::new(9, 4),   // Lux
1008            DptId::new(9, 7),   // Humidity
1009            DptId::new(12, 1),  // Counter (u32)
1010            DptId::new(13, 1),  // Counter signed (i32)
1011            DptId::new(14, 56), // Float (f32)
1012        ];
1013        let dpt_names = [
1014            "Switch", "Scaling", "Temperature", "Lux",
1015            "Humidity", "Counter", "SignedCounter", "Float",
1016        ];
1017
1018        for i in 0..self.group_objects {
1019            let main = ((i / 256) + 1) as u8;
1020            let middle = ((i / 8) % 8) as u8;
1021            let sub = (i % 256) as u8;
1022            let addr = GroupAddress::three_level(main, middle, sub);
1023            let dpt_idx = i % dpt_types.len();
1024            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1025            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1026                tracing::warn!("Failed to create group object {}: {}", i, e);
1027            }
1028        }
1029
1030        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1031
1032        {
1033            let mut server_guard = self.server.lock().await;
1034            *server_guard = Some(server.clone());
1035        }
1036
1037        let server_clone = server.clone();
1038        let task = tokio::spawn(async move {
1039            if let Err(e) = server_clone.start().await {
1040                tracing::error!("KNX server error: {}", e);
1041            }
1042        });
1043
1044        {
1045            let mut task_guard = self.server_task.lock().await;
1046            *task_guard = Some(task);
1047        }
1048
1049        tokio::time::sleep(Duration::from_millis(100)).await;
1050
1051        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1052        Ok(())
1053    }
1054
1055    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1056        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1057        let server_opt = self.server.lock().await.take();
1058        if let Some(server) = server_opt {
1059            // Use spawn_blocking to handle the non-Send future
1060            let _ = tokio::task::spawn_blocking(move || {
1061                let rt = tokio::runtime::Handle::current();
1062                rt.block_on(async {
1063                    let _ = server.stop().await;
1064                })
1065            }).await;
1066        }
1067
1068        if let Some(task) = self.server_task.lock().await.take() {
1069            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1070        }
1071
1072        Ok(())
1073    }
1074}