Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25/// Base trait for protocol-specific commands.
26#[async_trait]
27pub trait ProtocolCommand: Command {
28    /// Get the protocol type.
29    fn protocol(&self) -> Protocol;
30
31    /// Get the default port.
32    fn default_port(&self) -> u16;
33
34    /// Start the protocol server.
35    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37    /// Stop the protocol server.
38    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41// =============================================================================
42// Modbus Command
43// =============================================================================
44
45/// Modbus protocol command.
46pub struct ModbusCommand {
47    /// Binding address.
48    bind_addr: SocketAddr,
49    /// Number of devices to simulate.
50    devices: usize,
51    /// Points per device.
52    points_per_device: usize,
53    /// Use RTU mode instead of TCP.
54    rtu_mode: bool,
55    /// Serial port for RTU mode.
56    serial_port: Option<String>,
57    /// Device tags.
58    tags: Tags,
59    /// Server instance (for shutdown).
60    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61    /// Server task handle.
62    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66    pub fn new() -> Self {
67        Self {
68            bind_addr: "0.0.0.0:502".parse().unwrap(),
69            devices: 1,
70            points_per_device: 100,
71            rtu_mode: false,
72            serial_port: None,
73            tags: Tags::new(),
74            server: Arc::new(Mutex::new(None)),
75            server_task: Arc::new(Mutex::new(None)),
76        }
77    }
78
79    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80        self.bind_addr = addr;
81        self
82    }
83
84    pub fn with_port(mut self, port: u16) -> Self {
85        self.bind_addr.set_port(port);
86        self
87    }
88
89    pub fn with_devices(mut self, devices: usize) -> Self {
90        self.devices = devices;
91        self
92    }
93
94    pub fn with_points(mut self, points: usize) -> Self {
95        self.points_per_device = points;
96        self
97    }
98
99    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100        self.rtu_mode = true;
101        self.serial_port = Some(serial_port.into());
102        self
103    }
104
105    pub fn with_tags(mut self, tags: Tags) -> Self {
106        self.tags = tags;
107        self
108    }
109}
110
111impl Default for ModbusCommand {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119    fn name(&self) -> &str {
120        "modbus"
121    }
122
123    fn description(&self) -> &str {
124        "Start a Modbus TCP/RTU simulator"
125    }
126
127    fn requires_engine(&self) -> bool {
128        true
129    }
130
131    fn supports_shutdown(&self) -> bool {
132        true
133    }
134
135    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136        let format = ctx.output().format();
137        let is_quiet = ctx.is_quiet();
138        let is_verbose = ctx.is_verbose();
139        let is_debug = ctx.is_debug();
140
141        if !is_quiet {
142            if matches!(format, OutputFormat::Table) {
143                let output = ctx.output();
144                if self.rtu_mode {
145                    output.header("Modbus RTU Simulator");
146                    output.kv(
147                        "Serial Port",
148                        self.serial_port.as_deref().unwrap_or("N/A"),
149                    );
150                } else {
151                    output.header("Modbus TCP Simulator");
152                    output.kv("Bind Address", self.bind_addr);
153                }
154                output.kv("Devices", self.devices);
155                output.kv("Points per Device", self.points_per_device);
156                output.kv("Total Points", self.devices * self.points_per_device);
157            }
158        }
159
160        // Verbose: show extra configuration details
161        if is_verbose {
162            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
164        }
165
166        // Debug: dump full configuration
167        if is_debug {
168            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171        }
172
173        self.start_server(ctx).await?;
174
175        let points_per_type = self.points_per_device / 4;
176
177        if !is_quiet {
178            match format {
179                OutputFormat::Table => {
180                    let colors_enabled = ctx.colors_enabled();
181                    let builder = TableBuilder::new(colors_enabled)
182                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184                    let devices = self.devices;
185                    let pts = points_per_type.to_string();
186                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187                        let unit_id = (i + 1).to_string();
188                        (
189                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190                            StatusType::Success,
191                        )
192                    });
193                    table.print();
194                }
195                _ => {
196                    #[derive(Serialize)]
197                    struct ModbusServerInfo {
198                        protocol: String,
199                        bind_address: String,
200                        devices: usize,
201                        points_per_device: usize,
202                        total_points: usize,
203                        rtu_mode: bool,
204                        serial_port: Option<String>,
205                        device_list: Vec<ModbusDeviceInfo>,
206                        status: String,
207                    }
208                    #[derive(Serialize)]
209                    struct ModbusDeviceInfo {
210                        unit_id: usize,
211                        holding_registers: usize,
212                        input_registers: usize,
213                        coils: usize,
214                        discrete_inputs: usize,
215                        status: String,
216                    }
217                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218                        .map(|i| ModbusDeviceInfo {
219                            unit_id: i + 1,
220                            holding_registers: points_per_type,
221                            input_registers: points_per_type,
222                            coils: points_per_type,
223                            discrete_inputs: points_per_type,
224                            status: "Online".into(),
225                        })
226                        .collect();
227                    let info = ModbusServerInfo {
228                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229                        bind_address: self.bind_addr.to_string(),
230                        devices: self.devices,
231                        points_per_device: self.points_per_device,
232                        total_points: self.devices * self.points_per_device,
233                        rtu_mode: self.rtu_mode,
234                        serial_port: self.serial_port.clone(),
235                        device_list,
236                        status: "Online".into(),
237                    };
238                    let _ = ctx.output().write(&info);
239                }
240            }
241        }
242
243        if !is_quiet {
244            ctx.output().info("Press Ctrl+C to stop");
245        }
246        ctx.shutdown_signal().notified().await;
247
248        self.stop_server(ctx).await?;
249        if !is_quiet {
250            ctx.output().success("Modbus simulator stopped");
251        }
252
253        Ok(CommandOutput::quiet_success())
254    }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259    fn protocol(&self) -> Protocol {
260        if self.rtu_mode {
261            Protocol::ModbusRtu
262        } else {
263            Protocol::ModbusTcp
264        }
265    }
266
267    fn default_port(&self) -> u16 {
268        502
269    }
270
271    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272        let output = ctx.output();
273        let spinner = output.spinner("Starting Modbus server...");
274
275        let config = ServerConfigV2 {
276            bind_address: self.bind_addr,
277            ..Default::default()
278        };
279
280        let server = Arc::new(ModbusTcpServerV2::new(config));
281
282        for i in 0..self.devices {
283            let unit_id = (i + 1) as u8;
284            let points = (self.points_per_device / 4) as u16;
285            let device_config = ModbusDeviceConfig {
286                unit_id,
287                name: format!("Device-{}", unit_id),
288                holding_registers: points,
289                input_registers: points,
290                coils: points,
291                discrete_inputs: points,
292                response_delay_ms: 0,
293                tags: self.tags.clone(),
294            };
295            let device = ModbusDevice::new(device_config);
296            server.add_device(device);
297        }
298
299        {
300            let mut server_guard = self.server.lock().await;
301            *server_guard = Some(server.clone());
302        }
303
304        let server_clone = server.clone();
305        let task = tokio::spawn(async move {
306            if let Err(e) = server_clone.run().await {
307                tracing::error!("Modbus server error: {}", e);
308            }
309        });
310
311        {
312            let mut task_guard = self.server_task.lock().await;
313            *task_guard = Some(task);
314        }
315
316        tokio::time::sleep(Duration::from_millis(100)).await;
317
318        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319        Ok(())
320    }
321
322    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323        if let Some(server) = self.server.lock().await.as_ref() {
324            server.shutdown();
325        }
326
327        if let Some(task) = self.server_task.lock().await.take() {
328            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329        }
330
331        Ok(())
332    }
333}
334
335// =============================================================================
336// OPC UA Command
337// =============================================================================
338
339/// OPC UA protocol command.
340pub struct OpcuaCommand {
341    bind_addr: SocketAddr,
342    endpoint_path: String,
343    nodes: usize,
344    security_mode: String,
345    /// Server instance (for shutdown).
346    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
347    /// Server task handle.
348    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
349}
350
351impl OpcuaCommand {
352    pub fn new() -> Self {
353        Self {
354            bind_addr: "0.0.0.0:4840".parse().unwrap(),
355            endpoint_path: "/".into(),
356            nodes: 1000,
357            security_mode: "None".into(),
358            server: Arc::new(Mutex::new(None)),
359            server_task: Arc::new(Mutex::new(None)),
360        }
361    }
362
363    pub fn with_port(mut self, port: u16) -> Self {
364        self.bind_addr.set_port(port);
365        self
366    }
367
368    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
369        self.endpoint_path = path.into();
370        self
371    }
372
373    pub fn with_nodes(mut self, nodes: usize) -> Self {
374        self.nodes = nodes;
375        self
376    }
377
378    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
379        self.security_mode = mode.into();
380        self
381    }
382}
383
384impl Default for OpcuaCommand {
385    fn default() -> Self {
386        Self::new()
387    }
388}
389
390#[async_trait]
391impl Command for OpcuaCommand {
392    fn name(&self) -> &str {
393        "opcua"
394    }
395
396    fn description(&self) -> &str {
397        "Start an OPC UA server simulator"
398    }
399
400    fn requires_engine(&self) -> bool {
401        true
402    }
403
404    fn supports_shutdown(&self) -> bool {
405        true
406    }
407
408    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
409        let format = ctx.output().format();
410        let is_quiet = ctx.is_quiet();
411        let is_verbose = ctx.is_verbose();
412        let is_debug = ctx.is_debug();
413
414        if !is_quiet {
415            if matches!(format, OutputFormat::Table) {
416                let output = ctx.output();
417                output.header("OPC UA Simulator");
418                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
419                output.kv("Nodes", self.nodes);
420                output.kv("Security Mode", &self.security_mode);
421            }
422        }
423
424        // Verbose: show extra details
425        if is_verbose {
426            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
427            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
428            ctx.vprintln(format!("  Max Subscriptions: 1000"));
429            ctx.vprintln(format!("  Max Monitored Items: 10000"));
430        }
431
432        // Debug: dump full configuration
433        if is_debug {
434            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
435            ctx.dprintln(format!("Node count: {}", self.nodes));
436            ctx.dprintln(format!("Security mode: {}", self.security_mode));
437            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
438        }
439
440        self.start_server(ctx).await?;
441
442        if !is_quiet {
443            match format {
444                OutputFormat::Table => {
445                    let colors_enabled = ctx.colors_enabled();
446                    let table = TableBuilder::new(colors_enabled)
447                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
448                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
449                        .status_row(
450                            ["1", &self.nodes.to_string(), "0", "Online"],
451                            StatusType::Success,
452                        );
453                    table.print();
454                }
455                _ => {
456                    #[derive(Serialize)]
457                    struct OpcuaServerInfo {
458                        protocol: String,
459                        endpoint: String,
460                        nodes: usize,
461                        security_mode: String,
462                        namespaces: Vec<NamespaceInfo>,
463                        status: String,
464                    }
465                    #[derive(Serialize)]
466                    struct NamespaceInfo {
467                        index: u32,
468                        nodes: String,
469                        subscriptions: u32,
470                        status: String,
471                    }
472                    let info = OpcuaServerInfo {
473                        protocol: "OPC UA".into(),
474                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
475                        nodes: self.nodes,
476                        security_mode: self.security_mode.clone(),
477                        namespaces: vec![
478                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
479                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
480                        ],
481                        status: "Online".into(),
482                    };
483                    let _ = ctx.output().write(&info);
484                }
485            }
486        }
487
488        if !is_quiet {
489            ctx.output().info("Press Ctrl+C to stop");
490        }
491        ctx.shutdown_signal().notified().await;
492
493        self.stop_server(ctx).await?;
494        if !is_quiet {
495            ctx.output().success("OPC UA simulator stopped");
496        }
497
498        Ok(CommandOutput::quiet_success())
499    }
500}
501
502#[async_trait]
503impl ProtocolCommand for OpcuaCommand {
504    fn protocol(&self) -> Protocol {
505        Protocol::OpcUa
506    }
507
508    fn default_port(&self) -> u16 {
509        4840
510    }
511
512    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
513        let output = ctx.output();
514        let spinner = output.spinner("Starting OPC UA server...");
515
516        let config = OpcUaServerConfig {
517            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
518            server_name: "Mabinogion OPC UA Simulator".to_string(),
519            max_subscriptions: 1000,
520            max_monitored_items: 10000,
521            ..Default::default()
522        };
523
524        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
525            crate::error::CliError::ExecutionFailed {
526                message: format!("Failed to create OPC UA server: {}", e)
527            }
528        })?);
529
530        // Add sample nodes with diverse types and mixed read-only / writable access.
531        // Even-indexed nodes are writable, odd-indexed are read-only.
532        let node_count = self.nodes.min(100);
533        for i in 0..node_count {
534            let node_id = format!("ns=2;i={}", 1000 + i);
535            let name = format!("Variable_{}", i);
536            let value = (i as f64) * 0.1;
537
538            if i % 2 == 0 {
539                let _ = server.add_writable_variable(node_id, name, value);
540            } else {
541                let _ = server.add_variable(node_id, name, value);
542            }
543        }
544
545        {
546            let mut server_guard = self.server.lock().await;
547            *server_guard = Some(server.clone());
548        }
549
550        let server_clone = server.clone();
551        let task = tokio::spawn(async move {
552            if let Err(e) = server_clone.start().await {
553                tracing::error!("OPC UA server error: {}", e);
554            }
555        });
556
557        {
558            let mut task_guard = self.server_task.lock().await;
559            *task_guard = Some(task);
560        }
561
562        tokio::time::sleep(Duration::from_millis(100)).await;
563
564        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
565        Ok(())
566    }
567
568    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
569        if let Some(server) = self.server.lock().await.as_ref() {
570            let _ = server.stop().await;
571        }
572
573        if let Some(task) = self.server_task.lock().await.take() {
574            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
575        }
576
577        Ok(())
578    }
579}
580
581// =============================================================================
582// BACnet Command
583// =============================================================================
584
585/// BACnet protocol command.
586pub struct BacnetCommand {
587    bind_addr: SocketAddr,
588    device_instance: u32,
589    objects: usize,
590    bbmd_enabled: bool,
591    /// Server instance (for shutdown).
592    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
593    /// Server task handle.
594    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
595}
596
597impl BacnetCommand {
598    pub fn new() -> Self {
599        Self {
600            bind_addr: "0.0.0.0:47808".parse().unwrap(),
601            device_instance: 1234,
602            objects: 100,
603            bbmd_enabled: false,
604            server: Arc::new(Mutex::new(None)),
605            server_task: Arc::new(Mutex::new(None)),
606        }
607    }
608
609    pub fn with_port(mut self, port: u16) -> Self {
610        self.bind_addr.set_port(port);
611        self
612    }
613
614    pub fn with_device_instance(mut self, instance: u32) -> Self {
615        self.device_instance = instance;
616        self
617    }
618
619    pub fn with_objects(mut self, objects: usize) -> Self {
620        self.objects = objects;
621        self
622    }
623
624    pub fn with_bbmd(mut self, enabled: bool) -> Self {
625        self.bbmd_enabled = enabled;
626        self
627    }
628}
629
630impl Default for BacnetCommand {
631    fn default() -> Self {
632        Self::new()
633    }
634}
635
636#[async_trait]
637impl Command for BacnetCommand {
638    fn name(&self) -> &str {
639        "bacnet"
640    }
641
642    fn description(&self) -> &str {
643        "Start a BACnet/IP simulator"
644    }
645
646    fn requires_engine(&self) -> bool {
647        true
648    }
649
650    fn supports_shutdown(&self) -> bool {
651        true
652    }
653
654    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
655        let format = ctx.output().format();
656        let is_quiet = ctx.is_quiet();
657        let is_verbose = ctx.is_verbose();
658        let is_debug = ctx.is_debug();
659
660        if !is_quiet {
661            if matches!(format, OutputFormat::Table) {
662                let output = ctx.output();
663                output.header("BACnet/IP Simulator");
664                output.kv("Bind Address", self.bind_addr);
665                output.kv("Device Instance", self.device_instance);
666                output.kv("Objects", self.objects);
667                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
668            }
669        }
670
671        // Verbose: show extra details
672        if is_verbose {
673            let per_type = self.objects / 4;
674            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
675            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
676        }
677
678        // Debug: dump full configuration
679        if is_debug {
680            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
681            ctx.dprintln(format!("Device instance: {}", self.device_instance));
682            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
683        }
684
685        self.start_server(ctx).await?;
686
687        let per_type = self.objects / 4;
688
689        if !is_quiet {
690            match format {
691                OutputFormat::Table => {
692                    let colors_enabled = ctx.colors_enabled();
693                    let table = TableBuilder::new(colors_enabled)
694                        .header(["Object Type", "Count", "Status"])
695                        .status_row(["Device", "1", "Online"], StatusType::Success)
696                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
697                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
698                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
699                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
700                    table.print();
701                }
702                _ => {
703                    #[derive(Serialize)]
704                    struct BacnetServerInfo {
705                        protocol: String,
706                        bind_address: String,
707                        device_instance: u32,
708                        objects: usize,
709                        bbmd_enabled: bool,
710                        object_types: Vec<ObjectTypeInfo>,
711                        status: String,
712                    }
713                    #[derive(Serialize)]
714                    struct ObjectTypeInfo {
715                        object_type: String,
716                        count: usize,
717                        status: String,
718                    }
719                    let info = BacnetServerInfo {
720                        protocol: "BACnet/IP".into(),
721                        bind_address: self.bind_addr.to_string(),
722                        device_instance: self.device_instance,
723                        objects: self.objects,
724                        bbmd_enabled: self.bbmd_enabled,
725                        object_types: vec![
726                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
727                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
728                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
729                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
730                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
731                        ],
732                        status: "Online".into(),
733                    };
734                    let _ = ctx.output().write(&info);
735                }
736            }
737        }
738
739        if !is_quiet {
740            ctx.output().info("Press Ctrl+C to stop");
741        }
742        ctx.shutdown_signal().notified().await;
743
744        self.stop_server(ctx).await?;
745        if !is_quiet {
746            ctx.output().success("BACnet simulator stopped");
747        }
748
749        Ok(CommandOutput::quiet_success())
750    }
751}
752
753#[async_trait]
754impl ProtocolCommand for BacnetCommand {
755    fn protocol(&self) -> Protocol {
756        Protocol::BacnetIp
757    }
758
759    fn default_port(&self) -> u16 {
760        47808
761    }
762
763    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
764        let output = ctx.output();
765        let spinner = output.spinner("Starting BACnet server...");
766
767        let config = BacnetServerConfig::new(self.device_instance)
768            .with_bind_addr(self.bind_addr)
769            .with_device_name("Mabinogion BACnet Simulator");
770
771        // Create object registry with sample objects
772        let registry = ObjectRegistry::new();
773
774        let objects_per_type = self.objects / 4;
775        for i in 0..objects_per_type {
776            let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
777            registry.register(Arc::new(ai));
778        }
779        for i in 0..objects_per_type {
780            let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
781            registry.register(Arc::new(ao));
782        }
783        for i in 0..objects_per_type {
784            let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
785            registry.register(Arc::new(bi));
786        }
787        for i in 0..objects_per_type {
788            let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
789            registry.register(Arc::new(bo));
790        }
791
792        let server = Arc::new(BACnetServer::new(config, registry));
793
794        {
795            let mut server_guard = self.server.lock().await;
796            *server_guard = Some(server.clone());
797        }
798
799        let server_clone = server.clone();
800        let task = tokio::spawn(async move {
801            if let Err(e) = server_clone.run().await {
802                tracing::error!("BACnet server error: {}", e);
803            }
804        });
805
806        {
807            let mut task_guard = self.server_task.lock().await;
808            *task_guard = Some(task);
809        }
810
811        tokio::time::sleep(Duration::from_millis(100)).await;
812
813        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
814        Ok(())
815    }
816
817    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
818        if let Some(server) = self.server.lock().await.as_ref() {
819            server.shutdown();
820        }
821
822        if let Some(task) = self.server_task.lock().await.take() {
823            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
824        }
825
826        Ok(())
827    }
828}
829
830// =============================================================================
831// KNX Command
832// =============================================================================
833
834/// KNX protocol command.
835pub struct KnxCommand {
836    bind_addr: SocketAddr,
837    individual_address: String,
838    group_objects: usize,
839    /// Server instance (for shutdown).
840    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
841    /// Server task handle.
842    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
843}
844
845impl KnxCommand {
846    pub fn new() -> Self {
847        Self {
848            bind_addr: "0.0.0.0:3671".parse().unwrap(),
849            individual_address: "1.1.1".into(),
850            group_objects: 100,
851            server: Arc::new(Mutex::new(None)),
852            server_task: Arc::new(Mutex::new(None)),
853        }
854    }
855
856    pub fn with_port(mut self, port: u16) -> Self {
857        self.bind_addr.set_port(port);
858        self
859    }
860
861    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
862        self.individual_address = addr.into();
863        self
864    }
865
866    pub fn with_group_objects(mut self, count: usize) -> Self {
867        self.group_objects = count;
868        self
869    }
870}
871
872impl Default for KnxCommand {
873    fn default() -> Self {
874        Self::new()
875    }
876}
877
878#[async_trait]
879impl Command for KnxCommand {
880    fn name(&self) -> &str {
881        "knx"
882    }
883
884    fn description(&self) -> &str {
885        "Start a KNXnet/IP simulator"
886    }
887
888    fn requires_engine(&self) -> bool {
889        true
890    }
891
892    fn supports_shutdown(&self) -> bool {
893        true
894    }
895
896    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
897        let format = ctx.output().format();
898        let is_quiet = ctx.is_quiet();
899        let is_verbose = ctx.is_verbose();
900        let is_debug = ctx.is_debug();
901
902        if !is_quiet {
903            if matches!(format, OutputFormat::Table) {
904                let output = ctx.output();
905                output.header("KNXnet/IP Simulator");
906                output.kv("Bind Address", self.bind_addr);
907                output.kv("Individual Address", &self.individual_address);
908                output.kv("Group Objects", self.group_objects);
909            }
910        }
911
912        // Verbose: show extra details
913        if is_verbose {
914            ctx.vprintln(format!("  Max Connections: 10"));
915            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
916        }
917
918        // Debug: dump full configuration
919        if is_debug {
920            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
921            ctx.dprintln(format!("Individual address: {}", self.individual_address));
922            ctx.dprintln(format!("Group objects: {}", self.group_objects));
923        }
924
925        self.start_server(ctx).await?;
926
927        if !is_quiet {
928            match format {
929                OutputFormat::Table => {
930                    let colors_enabled = ctx.colors_enabled();
931                    let table = TableBuilder::new(colors_enabled)
932                        .header(["Service", "Status"])
933                        .status_row(["Core", "Ready"], StatusType::Success)
934                        .status_row(["Device Management", "Ready"], StatusType::Success)
935                        .status_row(["Tunneling", "Ready"], StatusType::Success);
936                    table.print();
937                }
938                _ => {
939                    #[derive(Serialize)]
940                    struct KnxServerInfo {
941                        protocol: String,
942                        bind_address: String,
943                        individual_address: String,
944                        group_objects: usize,
945                        services: Vec<ServiceInfo>,
946                        status: String,
947                    }
948                    #[derive(Serialize)]
949                    struct ServiceInfo {
950                        service: String,
951                        status: String,
952                    }
953                    let info = KnxServerInfo {
954                        protocol: "KNXnet/IP".into(),
955                        bind_address: self.bind_addr.to_string(),
956                        individual_address: self.individual_address.clone(),
957                        group_objects: self.group_objects,
958                        services: vec![
959                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
960                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
961                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
962                        ],
963                        status: "Online".into(),
964                    };
965                    let _ = ctx.output().write(&info);
966                }
967            }
968        }
969
970        if !is_quiet {
971            ctx.output().info("Press Ctrl+C to stop");
972        }
973        ctx.shutdown_signal().notified().await;
974
975        self.stop_server(ctx).await?;
976        if !is_quiet {
977            ctx.output().success("KNX simulator stopped");
978        }
979
980        Ok(CommandOutput::quiet_success())
981    }
982}
983
984#[async_trait]
985impl ProtocolCommand for KnxCommand {
986    fn protocol(&self) -> Protocol {
987        Protocol::KnxIp
988    }
989
990    fn default_port(&self) -> u16 {
991        3671
992    }
993
994    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
995        let output = ctx.output();
996        let spinner = output.spinner("Starting KNX server...");
997
998        // Parse individual address
999        let individual_address: IndividualAddress = self.individual_address.parse()
1000            .map_err(|_| crate::error::CliError::ExecutionFailed {
1001                message: format!("Invalid individual address: {}", self.individual_address)
1002            })?;
1003
1004        let config = KnxServerConfig {
1005            bind_addr: self.bind_addr,
1006            individual_address,
1007            max_connections: 256,
1008            ..Default::default()
1009        };
1010
1011        // Create group objects based on --groups parameter
1012        let group_table = Arc::new(GroupObjectTable::new());
1013        let dpt_types = [
1014            DptId::new(1, 1),   // Switch (bool)
1015            DptId::new(5, 1),   // Scaling (0-100%)
1016            DptId::new(9, 1),   // Temperature (float16)
1017            DptId::new(9, 4),   // Lux
1018            DptId::new(9, 7),   // Humidity
1019            DptId::new(12, 1),  // Counter (u32)
1020            DptId::new(13, 1),  // Counter signed (i32)
1021            DptId::new(14, 56), // Float (f32)
1022        ];
1023        let dpt_names = [
1024            "Switch", "Scaling", "Temperature", "Lux",
1025            "Humidity", "Counter", "SignedCounter", "Float",
1026        ];
1027
1028        for i in 0..self.group_objects {
1029            let main = ((i / 256) + 1) as u8;
1030            let middle = ((i / 8) % 8) as u8;
1031            let sub = (i % 256) as u8;
1032            let addr = GroupAddress::three_level(main, middle, sub);
1033            let dpt_idx = i % dpt_types.len();
1034            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1035            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1036                tracing::warn!("Failed to create group object {}: {}", i, e);
1037            }
1038        }
1039
1040        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1041
1042        {
1043            let mut server_guard = self.server.lock().await;
1044            *server_guard = Some(server.clone());
1045        }
1046
1047        let server_clone = server.clone();
1048        let task = tokio::spawn(async move {
1049            if let Err(e) = server_clone.start().await {
1050                tracing::error!("KNX server error: {}", e);
1051            }
1052        });
1053
1054        {
1055            let mut task_guard = self.server_task.lock().await;
1056            *task_guard = Some(task);
1057        }
1058
1059        tokio::time::sleep(Duration::from_millis(100)).await;
1060
1061        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1062        Ok(())
1063    }
1064
1065    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1066        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1067        let server_opt = self.server.lock().await.take();
1068        if let Some(server) = server_opt {
1069            // Use spawn_blocking to handle the non-Send future
1070            let _ = tokio::task::spawn_blocking(move || {
1071                let rt = tokio::runtime::Handle::current();
1072                rt.block_on(async {
1073                    let _ = server.stop().await;
1074                })
1075            }).await;
1076        }
1077
1078        if let Some(task) = self.server_task.lock().await.take() {
1079            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1080        }
1081
1082        Ok(())
1083    }
1084}