Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use serde::Serialize;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18// Protocol-specific imports
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
20use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
21use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
22use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress};
23
24/// Base trait for protocol-specific commands.
25#[async_trait]
26pub trait ProtocolCommand: Command {
27    /// Get the protocol type.
28    fn protocol(&self) -> Protocol;
29
30    /// Get the default port.
31    fn default_port(&self) -> u16;
32
33    /// Start the protocol server.
34    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
35
36    /// Stop the protocol server.
37    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
38}
39
40// =============================================================================
41// Modbus Command
42// =============================================================================
43
44/// Modbus protocol command.
45pub struct ModbusCommand {
46    /// Binding address.
47    bind_addr: SocketAddr,
48    /// Number of devices to simulate.
49    devices: usize,
50    /// Points per device.
51    points_per_device: usize,
52    /// Use RTU mode instead of TCP.
53    rtu_mode: bool,
54    /// Serial port for RTU mode.
55    serial_port: Option<String>,
56    /// Server instance (for shutdown).
57    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
58    /// Server task handle.
59    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
60}
61
62impl ModbusCommand {
63    pub fn new() -> Self {
64        Self {
65            bind_addr: "0.0.0.0:502".parse().unwrap(),
66            devices: 1,
67            points_per_device: 100,
68            rtu_mode: false,
69            serial_port: None,
70            server: Arc::new(Mutex::new(None)),
71            server_task: Arc::new(Mutex::new(None)),
72        }
73    }
74
75    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
76        self.bind_addr = addr;
77        self
78    }
79
80    pub fn with_port(mut self, port: u16) -> Self {
81        self.bind_addr.set_port(port);
82        self
83    }
84
85    pub fn with_devices(mut self, devices: usize) -> Self {
86        self.devices = devices;
87        self
88    }
89
90    pub fn with_points(mut self, points: usize) -> Self {
91        self.points_per_device = points;
92        self
93    }
94
95    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
96        self.rtu_mode = true;
97        self.serial_port = Some(serial_port.into());
98        self
99    }
100}
101
102impl Default for ModbusCommand {
103    fn default() -> Self {
104        Self::new()
105    }
106}
107
108#[async_trait]
109impl Command for ModbusCommand {
110    fn name(&self) -> &str {
111        "modbus"
112    }
113
114    fn description(&self) -> &str {
115        "Start a Modbus TCP/RTU simulator"
116    }
117
118    fn requires_engine(&self) -> bool {
119        true
120    }
121
122    fn supports_shutdown(&self) -> bool {
123        true
124    }
125
126    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
127        let format = ctx.output().format();
128        let is_quiet = ctx.is_quiet();
129        let is_verbose = ctx.is_verbose();
130        let is_debug = ctx.is_debug();
131
132        if !is_quiet {
133            if matches!(format, OutputFormat::Table) {
134                let output = ctx.output();
135                if self.rtu_mode {
136                    output.header("Modbus RTU Simulator");
137                    output.kv(
138                        "Serial Port",
139                        self.serial_port.as_deref().unwrap_or("N/A"),
140                    );
141                } else {
142                    output.header("Modbus TCP Simulator");
143                    output.kv("Bind Address", self.bind_addr);
144                }
145                output.kv("Devices", self.devices);
146                output.kv("Points per Device", self.points_per_device);
147                output.kv("Total Points", self.devices * self.points_per_device);
148            }
149        }
150
151        // Verbose: show extra configuration details
152        if is_verbose {
153            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
154            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
155        }
156
157        // Debug: dump full configuration
158        if is_debug {
159            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
160            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
161            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
162        }
163
164        self.start_server(ctx).await?;
165
166        let points_per_type = self.points_per_device / 4;
167
168        if !is_quiet {
169            match format {
170                OutputFormat::Table => {
171                    let colors_enabled = ctx.colors_enabled();
172                    let builder = TableBuilder::new(colors_enabled)
173                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
174
175                    let devices = self.devices;
176                    let pts = points_per_type.to_string();
177                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
178                        let unit_id = (i + 1).to_string();
179                        (
180                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
181                            StatusType::Success,
182                        )
183                    });
184                    table.print();
185                }
186                _ => {
187                    #[derive(Serialize)]
188                    struct ModbusServerInfo {
189                        protocol: String,
190                        bind_address: String,
191                        devices: usize,
192                        points_per_device: usize,
193                        total_points: usize,
194                        rtu_mode: bool,
195                        serial_port: Option<String>,
196                        device_list: Vec<ModbusDeviceInfo>,
197                        status: String,
198                    }
199                    #[derive(Serialize)]
200                    struct ModbusDeviceInfo {
201                        unit_id: usize,
202                        holding_registers: usize,
203                        input_registers: usize,
204                        coils: usize,
205                        discrete_inputs: usize,
206                        status: String,
207                    }
208                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
209                        .map(|i| ModbusDeviceInfo {
210                            unit_id: i + 1,
211                            holding_registers: points_per_type,
212                            input_registers: points_per_type,
213                            coils: points_per_type,
214                            discrete_inputs: points_per_type,
215                            status: "Online".into(),
216                        })
217                        .collect();
218                    let info = ModbusServerInfo {
219                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
220                        bind_address: self.bind_addr.to_string(),
221                        devices: self.devices,
222                        points_per_device: self.points_per_device,
223                        total_points: self.devices * self.points_per_device,
224                        rtu_mode: self.rtu_mode,
225                        serial_port: self.serial_port.clone(),
226                        device_list,
227                        status: "Online".into(),
228                    };
229                    let _ = ctx.output().write(&info);
230                }
231            }
232        }
233
234        if !is_quiet {
235            ctx.output().info("Press Ctrl+C to stop");
236        }
237        ctx.shutdown_signal().notified().await;
238
239        self.stop_server(ctx).await?;
240        if !is_quiet {
241            ctx.output().success("Modbus simulator stopped");
242        }
243
244        Ok(CommandOutput::quiet_success())
245    }
246}
247
248#[async_trait]
249impl ProtocolCommand for ModbusCommand {
250    fn protocol(&self) -> Protocol {
251        if self.rtu_mode {
252            Protocol::ModbusRtu
253        } else {
254            Protocol::ModbusTcp
255        }
256    }
257
258    fn default_port(&self) -> u16 {
259        502
260    }
261
262    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
263        let output = ctx.output();
264        let spinner = output.spinner("Starting Modbus server...");
265
266        let config = ServerConfigV2 {
267            bind_address: self.bind_addr,
268            ..Default::default()
269        };
270
271        let server = Arc::new(ModbusTcpServerV2::new(config));
272
273        for i in 0..self.devices {
274            let unit_id = (i + 1) as u8;
275            let points = (self.points_per_device / 4) as u16;
276            let device_config = ModbusDeviceConfig {
277                unit_id,
278                name: format!("Device-{}", unit_id),
279                holding_registers: points,
280                input_registers: points,
281                coils: points,
282                discrete_inputs: points,
283                response_delay_ms: 0,
284            };
285            let device = ModbusDevice::new(device_config);
286            server.add_device(device);
287        }
288
289        {
290            let mut server_guard = self.server.lock().await;
291            *server_guard = Some(server.clone());
292        }
293
294        let server_clone = server.clone();
295        let task = tokio::spawn(async move {
296            if let Err(e) = server_clone.run().await {
297                tracing::error!("Modbus server error: {}", e);
298            }
299        });
300
301        {
302            let mut task_guard = self.server_task.lock().await;
303            *task_guard = Some(task);
304        }
305
306        tokio::time::sleep(Duration::from_millis(100)).await;
307
308        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
309        Ok(())
310    }
311
312    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
313        if let Some(server) = self.server.lock().await.as_ref() {
314            server.shutdown();
315        }
316
317        if let Some(task) = self.server_task.lock().await.take() {
318            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
319        }
320
321        Ok(())
322    }
323}
324
325// =============================================================================
326// OPC UA Command
327// =============================================================================
328
329/// OPC UA protocol command.
330pub struct OpcuaCommand {
331    bind_addr: SocketAddr,
332    endpoint_path: String,
333    nodes: usize,
334    security_mode: String,
335    /// Server instance (for shutdown).
336    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
337    /// Server task handle.
338    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
339}
340
341impl OpcuaCommand {
342    pub fn new() -> Self {
343        Self {
344            bind_addr: "0.0.0.0:4840".parse().unwrap(),
345            endpoint_path: "/".into(),
346            nodes: 1000,
347            security_mode: "None".into(),
348            server: Arc::new(Mutex::new(None)),
349            server_task: Arc::new(Mutex::new(None)),
350        }
351    }
352
353    pub fn with_port(mut self, port: u16) -> Self {
354        self.bind_addr.set_port(port);
355        self
356    }
357
358    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
359        self.endpoint_path = path.into();
360        self
361    }
362
363    pub fn with_nodes(mut self, nodes: usize) -> Self {
364        self.nodes = nodes;
365        self
366    }
367
368    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
369        self.security_mode = mode.into();
370        self
371    }
372}
373
374impl Default for OpcuaCommand {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380#[async_trait]
381impl Command for OpcuaCommand {
382    fn name(&self) -> &str {
383        "opcua"
384    }
385
386    fn description(&self) -> &str {
387        "Start an OPC UA server simulator"
388    }
389
390    fn requires_engine(&self) -> bool {
391        true
392    }
393
394    fn supports_shutdown(&self) -> bool {
395        true
396    }
397
398    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
399        let format = ctx.output().format();
400        let is_quiet = ctx.is_quiet();
401        let is_verbose = ctx.is_verbose();
402        let is_debug = ctx.is_debug();
403
404        if !is_quiet {
405            if matches!(format, OutputFormat::Table) {
406                let output = ctx.output();
407                output.header("OPC UA Simulator");
408                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
409                output.kv("Nodes", self.nodes);
410                output.kv("Security Mode", &self.security_mode);
411            }
412        }
413
414        // Verbose: show extra details
415        if is_verbose {
416            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
417            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
418            ctx.vprintln(format!("  Max Subscriptions: 1000"));
419            ctx.vprintln(format!("  Max Monitored Items: 10000"));
420        }
421
422        // Debug: dump full configuration
423        if is_debug {
424            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
425            ctx.dprintln(format!("Node count: {}", self.nodes));
426            ctx.dprintln(format!("Security mode: {}", self.security_mode));
427            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
428        }
429
430        self.start_server(ctx).await?;
431
432        if !is_quiet {
433            match format {
434                OutputFormat::Table => {
435                    let colors_enabled = ctx.colors_enabled();
436                    let table = TableBuilder::new(colors_enabled)
437                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
438                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
439                        .status_row(
440                            ["1", &self.nodes.to_string(), "0", "Online"],
441                            StatusType::Success,
442                        );
443                    table.print();
444                }
445                _ => {
446                    #[derive(Serialize)]
447                    struct OpcuaServerInfo {
448                        protocol: String,
449                        endpoint: String,
450                        nodes: usize,
451                        security_mode: String,
452                        namespaces: Vec<NamespaceInfo>,
453                        status: String,
454                    }
455                    #[derive(Serialize)]
456                    struct NamespaceInfo {
457                        index: u32,
458                        nodes: String,
459                        subscriptions: u32,
460                        status: String,
461                    }
462                    let info = OpcuaServerInfo {
463                        protocol: "OPC UA".into(),
464                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
465                        nodes: self.nodes,
466                        security_mode: self.security_mode.clone(),
467                        namespaces: vec![
468                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
469                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
470                        ],
471                        status: "Online".into(),
472                    };
473                    let _ = ctx.output().write(&info);
474                }
475            }
476        }
477
478        if !is_quiet {
479            ctx.output().info("Press Ctrl+C to stop");
480        }
481        ctx.shutdown_signal().notified().await;
482
483        self.stop_server(ctx).await?;
484        if !is_quiet {
485            ctx.output().success("OPC UA simulator stopped");
486        }
487
488        Ok(CommandOutput::quiet_success())
489    }
490}
491
492#[async_trait]
493impl ProtocolCommand for OpcuaCommand {
494    fn protocol(&self) -> Protocol {
495        Protocol::OpcUa
496    }
497
498    fn default_port(&self) -> u16 {
499        4840
500    }
501
502    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
503        let output = ctx.output();
504        let spinner = output.spinner("Starting OPC UA server...");
505
506        let config = OpcUaServerConfig {
507            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
508            server_name: "Mabinogion OPC UA Simulator".to_string(),
509            max_subscriptions: 1000,
510            max_monitored_items: 10000,
511            ..Default::default()
512        };
513
514        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
515            crate::error::CliError::ExecutionFailed {
516                message: format!("Failed to create OPC UA server: {}", e)
517            }
518        })?);
519
520        // Add sample nodes
521        for i in 0..self.nodes.min(100) {
522            let node_id = format!("ns=2;i={}", 1000 + i);
523            let _ = server.add_variable(node_id, format!("Variable_{}", i), (i as f64) * 0.1);
524        }
525
526        {
527            let mut server_guard = self.server.lock().await;
528            *server_guard = Some(server.clone());
529        }
530
531        let server_clone = server.clone();
532        let task = tokio::spawn(async move {
533            if let Err(e) = server_clone.start().await {
534                tracing::error!("OPC UA server error: {}", e);
535            }
536        });
537
538        {
539            let mut task_guard = self.server_task.lock().await;
540            *task_guard = Some(task);
541        }
542
543        tokio::time::sleep(Duration::from_millis(100)).await;
544
545        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
546        Ok(())
547    }
548
549    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
550        if let Some(server) = self.server.lock().await.as_ref() {
551            let _ = server.stop().await;
552        }
553
554        if let Some(task) = self.server_task.lock().await.take() {
555            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
556        }
557
558        Ok(())
559    }
560}
561
562// =============================================================================
563// BACnet Command
564// =============================================================================
565
566/// BACnet protocol command.
567pub struct BacnetCommand {
568    bind_addr: SocketAddr,
569    device_instance: u32,
570    objects: usize,
571    bbmd_enabled: bool,
572    /// Server instance (for shutdown).
573    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
574    /// Server task handle.
575    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
576}
577
578impl BacnetCommand {
579    pub fn new() -> Self {
580        Self {
581            bind_addr: "0.0.0.0:47808".parse().unwrap(),
582            device_instance: 1234,
583            objects: 100,
584            bbmd_enabled: false,
585            server: Arc::new(Mutex::new(None)),
586            server_task: Arc::new(Mutex::new(None)),
587        }
588    }
589
590    pub fn with_port(mut self, port: u16) -> Self {
591        self.bind_addr.set_port(port);
592        self
593    }
594
595    pub fn with_device_instance(mut self, instance: u32) -> Self {
596        self.device_instance = instance;
597        self
598    }
599
600    pub fn with_objects(mut self, objects: usize) -> Self {
601        self.objects = objects;
602        self
603    }
604
605    pub fn with_bbmd(mut self, enabled: bool) -> Self {
606        self.bbmd_enabled = enabled;
607        self
608    }
609}
610
611impl Default for BacnetCommand {
612    fn default() -> Self {
613        Self::new()
614    }
615}
616
617#[async_trait]
618impl Command for BacnetCommand {
619    fn name(&self) -> &str {
620        "bacnet"
621    }
622
623    fn description(&self) -> &str {
624        "Start a BACnet/IP simulator"
625    }
626
627    fn requires_engine(&self) -> bool {
628        true
629    }
630
631    fn supports_shutdown(&self) -> bool {
632        true
633    }
634
635    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
636        let format = ctx.output().format();
637        let is_quiet = ctx.is_quiet();
638        let is_verbose = ctx.is_verbose();
639        let is_debug = ctx.is_debug();
640
641        if !is_quiet {
642            if matches!(format, OutputFormat::Table) {
643                let output = ctx.output();
644                output.header("BACnet/IP Simulator");
645                output.kv("Bind Address", self.bind_addr);
646                output.kv("Device Instance", self.device_instance);
647                output.kv("Objects", self.objects);
648                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
649            }
650        }
651
652        // Verbose: show extra details
653        if is_verbose {
654            let per_type = self.objects / 4;
655            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
656            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
657        }
658
659        // Debug: dump full configuration
660        if is_debug {
661            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
662            ctx.dprintln(format!("Device instance: {}", self.device_instance));
663            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
664        }
665
666        self.start_server(ctx).await?;
667
668        let per_type = self.objects / 4;
669
670        if !is_quiet {
671            match format {
672                OutputFormat::Table => {
673                    let colors_enabled = ctx.colors_enabled();
674                    let table = TableBuilder::new(colors_enabled)
675                        .header(["Object Type", "Count", "Status"])
676                        .status_row(["Device", "1", "Online"], StatusType::Success)
677                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
678                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
679                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
680                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
681                    table.print();
682                }
683                _ => {
684                    #[derive(Serialize)]
685                    struct BacnetServerInfo {
686                        protocol: String,
687                        bind_address: String,
688                        device_instance: u32,
689                        objects: usize,
690                        bbmd_enabled: bool,
691                        object_types: Vec<ObjectTypeInfo>,
692                        status: String,
693                    }
694                    #[derive(Serialize)]
695                    struct ObjectTypeInfo {
696                        object_type: String,
697                        count: usize,
698                        status: String,
699                    }
700                    let info = BacnetServerInfo {
701                        protocol: "BACnet/IP".into(),
702                        bind_address: self.bind_addr.to_string(),
703                        device_instance: self.device_instance,
704                        objects: self.objects,
705                        bbmd_enabled: self.bbmd_enabled,
706                        object_types: vec![
707                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
708                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
709                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
710                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
711                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
712                        ],
713                        status: "Online".into(),
714                    };
715                    let _ = ctx.output().write(&info);
716                }
717            }
718        }
719
720        if !is_quiet {
721            ctx.output().info("Press Ctrl+C to stop");
722        }
723        ctx.shutdown_signal().notified().await;
724
725        self.stop_server(ctx).await?;
726        if !is_quiet {
727            ctx.output().success("BACnet simulator stopped");
728        }
729
730        Ok(CommandOutput::quiet_success())
731    }
732}
733
734#[async_trait]
735impl ProtocolCommand for BacnetCommand {
736    fn protocol(&self) -> Protocol {
737        Protocol::BacnetIp
738    }
739
740    fn default_port(&self) -> u16 {
741        47808
742    }
743
744    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
745        let output = ctx.output();
746        let spinner = output.spinner("Starting BACnet server...");
747
748        let config = BacnetServerConfig::new(self.device_instance)
749            .with_bind_addr(self.bind_addr)
750            .with_device_name("Mabinogion BACnet Simulator");
751
752        // Create object registry with sample objects
753        let registry = ObjectRegistry::new();
754
755        let objects_per_type = self.objects / 4;
756        for i in 0..objects_per_type {
757            let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
758            registry.register(Arc::new(ai));
759        }
760        for i in 0..objects_per_type {
761            let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
762            registry.register(Arc::new(ao));
763        }
764        for i in 0..objects_per_type {
765            let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
766            registry.register(Arc::new(bi));
767        }
768        for i in 0..objects_per_type {
769            let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
770            registry.register(Arc::new(bo));
771        }
772
773        let server = Arc::new(BACnetServer::new(config, registry));
774
775        {
776            let mut server_guard = self.server.lock().await;
777            *server_guard = Some(server.clone());
778        }
779
780        let server_clone = server.clone();
781        let task = tokio::spawn(async move {
782            if let Err(e) = server_clone.run().await {
783                tracing::error!("BACnet server error: {}", e);
784            }
785        });
786
787        {
788            let mut task_guard = self.server_task.lock().await;
789            *task_guard = Some(task);
790        }
791
792        tokio::time::sleep(Duration::from_millis(100)).await;
793
794        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
795        Ok(())
796    }
797
798    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
799        if let Some(server) = self.server.lock().await.as_ref() {
800            server.shutdown();
801        }
802
803        if let Some(task) = self.server_task.lock().await.take() {
804            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
805        }
806
807        Ok(())
808    }
809}
810
811// =============================================================================
812// KNX Command
813// =============================================================================
814
815/// KNX protocol command.
816pub struct KnxCommand {
817    bind_addr: SocketAddr,
818    individual_address: String,
819    group_objects: usize,
820    /// Server instance (for shutdown).
821    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
822    /// Server task handle.
823    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
824}
825
826impl KnxCommand {
827    pub fn new() -> Self {
828        Self {
829            bind_addr: "0.0.0.0:3671".parse().unwrap(),
830            individual_address: "1.1.1".into(),
831            group_objects: 100,
832            server: Arc::new(Mutex::new(None)),
833            server_task: Arc::new(Mutex::new(None)),
834        }
835    }
836
837    pub fn with_port(mut self, port: u16) -> Self {
838        self.bind_addr.set_port(port);
839        self
840    }
841
842    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
843        self.individual_address = addr.into();
844        self
845    }
846
847    pub fn with_group_objects(mut self, count: usize) -> Self {
848        self.group_objects = count;
849        self
850    }
851}
852
853impl Default for KnxCommand {
854    fn default() -> Self {
855        Self::new()
856    }
857}
858
859#[async_trait]
860impl Command for KnxCommand {
861    fn name(&self) -> &str {
862        "knx"
863    }
864
865    fn description(&self) -> &str {
866        "Start a KNXnet/IP simulator"
867    }
868
869    fn requires_engine(&self) -> bool {
870        true
871    }
872
873    fn supports_shutdown(&self) -> bool {
874        true
875    }
876
877    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
878        let format = ctx.output().format();
879        let is_quiet = ctx.is_quiet();
880        let is_verbose = ctx.is_verbose();
881        let is_debug = ctx.is_debug();
882
883        if !is_quiet {
884            if matches!(format, OutputFormat::Table) {
885                let output = ctx.output();
886                output.header("KNXnet/IP Simulator");
887                output.kv("Bind Address", self.bind_addr);
888                output.kv("Individual Address", &self.individual_address);
889                output.kv("Group Objects", self.group_objects);
890            }
891        }
892
893        // Verbose: show extra details
894        if is_verbose {
895            ctx.vprintln(format!("  Max Connections: 10"));
896            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
897        }
898
899        // Debug: dump full configuration
900        if is_debug {
901            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
902            ctx.dprintln(format!("Individual address: {}", self.individual_address));
903            ctx.dprintln(format!("Group objects: {}", self.group_objects));
904        }
905
906        self.start_server(ctx).await?;
907
908        if !is_quiet {
909            match format {
910                OutputFormat::Table => {
911                    let colors_enabled = ctx.colors_enabled();
912                    let table = TableBuilder::new(colors_enabled)
913                        .header(["Service", "Status"])
914                        .status_row(["Core", "Ready"], StatusType::Success)
915                        .status_row(["Device Management", "Ready"], StatusType::Success)
916                        .status_row(["Tunneling", "Ready"], StatusType::Success);
917                    table.print();
918                }
919                _ => {
920                    #[derive(Serialize)]
921                    struct KnxServerInfo {
922                        protocol: String,
923                        bind_address: String,
924                        individual_address: String,
925                        group_objects: usize,
926                        services: Vec<ServiceInfo>,
927                        status: String,
928                    }
929                    #[derive(Serialize)]
930                    struct ServiceInfo {
931                        service: String,
932                        status: String,
933                    }
934                    let info = KnxServerInfo {
935                        protocol: "KNXnet/IP".into(),
936                        bind_address: self.bind_addr.to_string(),
937                        individual_address: self.individual_address.clone(),
938                        group_objects: self.group_objects,
939                        services: vec![
940                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
941                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
942                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
943                        ],
944                        status: "Online".into(),
945                    };
946                    let _ = ctx.output().write(&info);
947                }
948            }
949        }
950
951        if !is_quiet {
952            ctx.output().info("Press Ctrl+C to stop");
953        }
954        ctx.shutdown_signal().notified().await;
955
956        self.stop_server(ctx).await?;
957        if !is_quiet {
958            ctx.output().success("KNX simulator stopped");
959        }
960
961        Ok(CommandOutput::quiet_success())
962    }
963}
964
965#[async_trait]
966impl ProtocolCommand for KnxCommand {
967    fn protocol(&self) -> Protocol {
968        Protocol::KnxIp
969    }
970
971    fn default_port(&self) -> u16 {
972        3671
973    }
974
975    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
976        let output = ctx.output();
977        let spinner = output.spinner("Starting KNX server...");
978
979        // Parse individual address
980        let individual_address: IndividualAddress = self.individual_address.parse()
981            .map_err(|_| crate::error::CliError::ExecutionFailed {
982                message: format!("Invalid individual address: {}", self.individual_address)
983            })?;
984
985        let config = KnxServerConfig {
986            bind_addr: self.bind_addr,
987            individual_address,
988            max_connections: 10,
989            ..Default::default()
990        };
991
992        let server = Arc::new(KnxServer::new(config));
993
994        {
995            let mut server_guard = self.server.lock().await;
996            *server_guard = Some(server.clone());
997        }
998
999        let server_clone = server.clone();
1000        let task = tokio::spawn(async move {
1001            if let Err(e) = server_clone.start().await {
1002                tracing::error!("KNX server error: {}", e);
1003            }
1004        });
1005
1006        {
1007            let mut task_guard = self.server_task.lock().await;
1008            *task_guard = Some(task);
1009        }
1010
1011        tokio::time::sleep(Duration::from_millis(100)).await;
1012
1013        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1014        Ok(())
1015    }
1016
1017    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1018        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1019        let server_opt = self.server.lock().await.take();
1020        if let Some(server) = server_opt {
1021            // Use spawn_blocking to handle the non-Send future
1022            let _ = tokio::task::spawn_blocking(move || {
1023                let rt = tokio::runtime::Handle::current();
1024                rt.block_on(async {
1025                    let _ = server.stop().await;
1026                })
1027            }).await;
1028        }
1029
1030        if let Some(task) = self.server_task.lock().await.take() {
1031            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1032        }
1033
1034        Ok(())
1035    }
1036}