Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_bacnet::prelude::{
21    default_object_descriptors, BACnetServer, ObjectRegistry, ServerConfig as BacnetServerConfig,
22};
23use mabi_knx::{
24    DptId, GroupAddress, GroupObjectTable, IndividualAddress, KnxServer, KnxServerConfig,
25};
26use mabi_modbus::{tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig, ModbusTcpServerV2};
27use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
28
29/// Checks if a port is already in use and provides diagnostics.
30///
31/// This is an advisory check — it warns but does not block. If the port is held by
32/// a zombie/suspended process, it provides specific diagnostic instructions.
33async fn check_port_availability(addr: SocketAddr) {
34    use tokio::io::{AsyncReadExt, AsyncWriteExt};
35    use tokio::net::TcpStream;
36
37    // Try to connect to the port
38    let connect_result =
39        tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
40
41    match connect_result {
42        Ok(Ok(_first_stream)) => {
43            // Port is in use — something is listening. Try a Modbus probe.
44            drop(_first_stream);
45
46            let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
47                let mut stream = TcpStream::connect(addr).await?;
48                // Send Modbus TCP ReadHoldingRegisters: txn=1, proto=0, len=6, unit=1, fc=3, addr=0, count=1
49                let request: [u8; 12] = [
50                    0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
51                ];
52                stream.write_all(&request).await?;
53                let mut response = [0u8; 32];
54                let n = stream.read(&mut response).await?;
55                Ok::<_, std::io::Error>(n)
56            })
57            .await;
58
59            match probe_result {
60                Ok(Ok(n)) if n >= 7 => {
61                    tracing::warn!(
62                        port = addr.port(),
63                        "Port {} is already in use by a responding Modbus server. \
64                         The new server will fail to bind.",
65                        addr.port()
66                    );
67                }
68                _ => {
69                    // TCP connects but Modbus doesn't respond — possible zombie
70                    tracing::warn!(
71                        port = addr.port(),
72                        "Port {} is in use: TCP connects but no Modbus response. \
73                         This may be a suspended (zombie) process holding the port.\n  \
74                         Diagnostic: lsof -i :{} | grep LISTEN\n  \
75                         To kill:    kill $(lsof -ti :{} -sTCP:LISTEN)",
76                        addr.port(),
77                        addr.port(),
78                        addr.port()
79                    );
80                }
81            }
82        }
83        Ok(Err(_)) | Err(_) => {
84            // Port is available (connection refused or timeout) — good
85            tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
86        }
87    }
88}
89
90/// Base trait for protocol-specific commands.
91#[async_trait]
92pub trait ProtocolCommand: Command {
93    /// Get the protocol type.
94    fn protocol(&self) -> Protocol;
95
96    /// Get the default port.
97    fn default_port(&self) -> u16;
98
99    /// Start the protocol server.
100    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
101
102    /// Stop the protocol server.
103    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
104}
105
106// =============================================================================
107// Modbus Command
108// =============================================================================
109
110/// Modbus protocol command.
111pub struct ModbusCommand {
112    /// Binding address.
113    bind_addr: SocketAddr,
114    /// Number of devices to simulate.
115    devices: usize,
116    /// Points per device.
117    points_per_device: usize,
118    /// Use RTU mode instead of TCP.
119    rtu_mode: bool,
120    /// Serial port for RTU mode.
121    serial_port: Option<String>,
122    /// Device tags.
123    tags: Tags,
124    /// Server instance (for shutdown).
125    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
126    /// Server task handle.
127    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
128}
129
130impl ModbusCommand {
131    pub fn new() -> Self {
132        Self {
133            bind_addr: "0.0.0.0:502".parse().unwrap(),
134            devices: 1,
135            points_per_device: 100,
136            rtu_mode: false,
137            serial_port: None,
138            tags: Tags::new(),
139            server: Arc::new(Mutex::new(None)),
140            server_task: Arc::new(Mutex::new(None)),
141        }
142    }
143
144    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
145        self.bind_addr = addr;
146        self
147    }
148
149    pub fn with_port(mut self, port: u16) -> Self {
150        self.bind_addr.set_port(port);
151        self
152    }
153
154    pub fn with_devices(mut self, devices: usize) -> Self {
155        self.devices = devices;
156        self
157    }
158
159    pub fn with_points(mut self, points: usize) -> Self {
160        self.points_per_device = points;
161        self
162    }
163
164    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
165        self.rtu_mode = true;
166        self.serial_port = Some(serial_port.into());
167        self
168    }
169
170    pub fn with_tags(mut self, tags: Tags) -> Self {
171        self.tags = tags;
172        self
173    }
174}
175
176impl Default for ModbusCommand {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182#[async_trait]
183impl Command for ModbusCommand {
184    fn name(&self) -> &str {
185        "modbus"
186    }
187
188    fn description(&self) -> &str {
189        "Start a Modbus TCP/RTU simulator"
190    }
191
192    fn requires_engine(&self) -> bool {
193        true
194    }
195
196    fn supports_shutdown(&self) -> bool {
197        true
198    }
199
200    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
201        let format = ctx.output().format();
202        let is_quiet = ctx.is_quiet();
203        let is_verbose = ctx.is_verbose();
204        let is_debug = ctx.is_debug();
205
206        if !is_quiet {
207            if matches!(format, OutputFormat::Table) {
208                let output = ctx.output();
209                if self.rtu_mode {
210                    output.header("Modbus RTU Simulator");
211                    output.kv("Serial Port", self.serial_port.as_deref().unwrap_or("N/A"));
212                } else {
213                    output.header("Modbus TCP Simulator");
214                    output.kv("Bind Address", self.bind_addr);
215                }
216                output.kv("Devices", self.devices);
217                output.kv("Points per Device", self.points_per_device);
218                output.kv("Total Points", self.devices * self.points_per_device);
219            }
220        }
221
222        // Verbose: show extra configuration details
223        if is_verbose {
224            ctx.vprintln(format!(
225                "  Protocol Mode: {}",
226                if self.rtu_mode { "RTU" } else { "TCP" }
227            ));
228            ctx.vprintln(format!(
229                "  Points Distribution: {} per register type",
230                self.points_per_device / 4
231            ));
232        }
233
234        // Debug: dump full configuration
235        if is_debug {
236            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
237            ctx.dprintln(format!(
238                "RTU mode: {}, Serial: {:?}",
239                self.rtu_mode, self.serial_port
240            ));
241            ctx.dprintln(format!(
242                "Devices: {}, Points/device: {}",
243                self.devices, self.points_per_device
244            ));
245        }
246
247        self.start_server(ctx).await?;
248
249        let points_per_type = self.points_per_device / 4;
250
251        if !is_quiet {
252            match format {
253                OutputFormat::Table => {
254                    let colors_enabled = ctx.colors_enabled();
255                    let builder = TableBuilder::new(colors_enabled).header([
256                        "Unit ID",
257                        "Holding Regs",
258                        "Input Regs",
259                        "Coils",
260                        "Discrete",
261                        "Status",
262                    ]);
263
264                    let devices = self.devices;
265                    let pts = points_per_type.to_string();
266                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
267                        let unit_id = (i + 1).to_string();
268                        (
269                            vec![
270                                unit_id,
271                                pts.clone(),
272                                pts.clone(),
273                                pts.clone(),
274                                pts.clone(),
275                                "Online".into(),
276                            ],
277                            StatusType::Success,
278                        )
279                    });
280                    table.print();
281                }
282                _ => {
283                    #[derive(Serialize)]
284                    struct ModbusServerInfo {
285                        protocol: String,
286                        bind_address: String,
287                        devices: usize,
288                        points_per_device: usize,
289                        total_points: usize,
290                        rtu_mode: bool,
291                        serial_port: Option<String>,
292                        device_list: Vec<ModbusDeviceInfo>,
293                        status: String,
294                    }
295                    #[derive(Serialize)]
296                    struct ModbusDeviceInfo {
297                        unit_id: usize,
298                        holding_registers: usize,
299                        input_registers: usize,
300                        coils: usize,
301                        discrete_inputs: usize,
302                        status: String,
303                    }
304                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
305                        .map(|i| ModbusDeviceInfo {
306                            unit_id: i + 1,
307                            holding_registers: points_per_type,
308                            input_registers: points_per_type,
309                            coils: points_per_type,
310                            discrete_inputs: points_per_type,
311                            status: "Online".into(),
312                        })
313                        .collect();
314                    let info = ModbusServerInfo {
315                        protocol: if self.rtu_mode {
316                            "Modbus RTU".into()
317                        } else {
318                            "Modbus TCP".into()
319                        },
320                        bind_address: self.bind_addr.to_string(),
321                        devices: self.devices,
322                        points_per_device: self.points_per_device,
323                        total_points: self.devices * self.points_per_device,
324                        rtu_mode: self.rtu_mode,
325                        serial_port: self.serial_port.clone(),
326                        device_list,
327                        status: "Online".into(),
328                    };
329                    let _ = ctx.output().write(&info);
330                }
331            }
332        }
333
334        if !is_quiet {
335            ctx.output().info("Press Ctrl+C to stop");
336        }
337        ctx.shutdown_signal().notified().await;
338
339        self.stop_server(ctx).await?;
340        if !is_quiet {
341            ctx.output().success("Modbus simulator stopped");
342        }
343
344        Ok(CommandOutput::quiet_success())
345    }
346}
347
348#[async_trait]
349impl ProtocolCommand for ModbusCommand {
350    fn protocol(&self) -> Protocol {
351        if self.rtu_mode {
352            Protocol::ModbusRtu
353        } else {
354            Protocol::ModbusTcp
355        }
356    }
357
358    fn default_port(&self) -> u16 {
359        502
360    }
361
362    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363        let output = ctx.output();
364
365        // Advisory port availability check before starting
366        check_port_availability(self.bind_addr).await;
367
368        let spinner = output.spinner("Starting Modbus server...");
369
370        let config = ServerConfigV2 {
371            bind_address: self.bind_addr,
372            ..Default::default()
373        };
374
375        let server = Arc::new(ModbusTcpServerV2::new(config));
376
377        for i in 0..self.devices {
378            let unit_id = (i + 1) as u8;
379            let points = (self.points_per_device / 4) as u16;
380            let device_config = ModbusDeviceConfig {
381                unit_id,
382                name: format!("Device-{}", unit_id),
383                holding_registers: points,
384                input_registers: points,
385                coils: points,
386                discrete_inputs: points,
387                response_delay_ms: 0,
388                tags: self.tags.clone(),
389            };
390            let device = ModbusDevice::new(device_config);
391            server.add_device(device);
392        }
393
394        {
395            let mut server_guard = self.server.lock().await;
396            *server_guard = Some(server.clone());
397        }
398
399        let server_clone = server.clone();
400        let task = tokio::spawn(async move {
401            if let Err(e) = server_clone.run().await {
402                tracing::error!("Modbus server error: {}", e);
403            }
404        });
405
406        {
407            let mut task_guard = self.server_task.lock().await;
408            *task_guard = Some(task);
409        }
410
411        tokio::time::sleep(Duration::from_millis(100)).await;
412
413        // Check if server task exited early (likely a bind failure / port-in-use)
414        {
415            let task_guard = self.server_task.lock().await;
416            if let Some(task) = task_guard.as_ref() {
417                if task.is_finished() {
418                    spinner.finish_with_message("Failed to start server");
419                    return Err(crate::error::CliError::PortInUse {
420                        port: self.bind_addr.port(),
421                    });
422                }
423            }
424        }
425
426        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
427        Ok(())
428    }
429
430    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
431        if let Some(server) = self.server.lock().await.as_ref() {
432            server.shutdown();
433        }
434
435        if let Some(task) = self.server_task.lock().await.take() {
436            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
437        }
438
439        Ok(())
440    }
441}
442
443// =============================================================================
444// OPC UA Command
445// =============================================================================
446
447/// OPC UA protocol command.
448pub struct OpcuaCommand {
449    bind_addr: SocketAddr,
450    endpoint_path: String,
451    nodes: usize,
452    security_mode: String,
453    /// Device tags.
454    tags: Tags,
455    /// Server instance (for shutdown).
456    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
457    /// Server task handle.
458    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
459}
460
461impl OpcuaCommand {
462    pub fn new() -> Self {
463        Self {
464            bind_addr: "0.0.0.0:4840".parse().unwrap(),
465            endpoint_path: "/".into(),
466            nodes: 1000,
467            security_mode: "None".into(),
468            tags: Tags::new(),
469            server: Arc::new(Mutex::new(None)),
470            server_task: Arc::new(Mutex::new(None)),
471        }
472    }
473
474    pub fn with_port(mut self, port: u16) -> Self {
475        self.bind_addr.set_port(port);
476        self
477    }
478
479    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
480        self.endpoint_path = path.into();
481        self
482    }
483
484    pub fn with_nodes(mut self, nodes: usize) -> Self {
485        self.nodes = nodes;
486        self
487    }
488
489    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
490        self.security_mode = mode.into();
491        self
492    }
493
494    pub fn with_tags(mut self, tags: Tags) -> Self {
495        self.tags = tags;
496        self
497    }
498}
499
500impl Default for OpcuaCommand {
501    fn default() -> Self {
502        Self::new()
503    }
504}
505
506#[async_trait]
507impl Command for OpcuaCommand {
508    fn name(&self) -> &str {
509        "opcua"
510    }
511
512    fn description(&self) -> &str {
513        "Start an OPC UA server simulator"
514    }
515
516    fn requires_engine(&self) -> bool {
517        true
518    }
519
520    fn supports_shutdown(&self) -> bool {
521        true
522    }
523
524    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
525        let format = ctx.output().format();
526        let is_quiet = ctx.is_quiet();
527        let is_verbose = ctx.is_verbose();
528        let is_debug = ctx.is_debug();
529
530        if !is_quiet {
531            if matches!(format, OutputFormat::Table) {
532                let output = ctx.output();
533                output.header("OPC UA Simulator");
534                output.kv(
535                    "Endpoint",
536                    format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
537                );
538                output.kv("Nodes", self.nodes);
539                output.kv("Security Mode", &self.security_mode);
540            }
541        }
542
543        // Verbose: show extra details
544        if is_verbose {
545            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
546            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
547            ctx.vprintln(format!("  Max Subscriptions: 1000"));
548            ctx.vprintln(format!("  Max Monitored Items: 10000"));
549        }
550
551        // Debug: dump full configuration
552        if is_debug {
553            ctx.dprintln(format!(
554                "Full endpoint URL: opc.tcp://{}{}",
555                self.bind_addr, self.endpoint_path
556            ));
557            ctx.dprintln(format!("Node count: {}", self.nodes));
558            ctx.dprintln(format!("Security mode: {}", self.security_mode));
559            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
560        }
561
562        self.start_server(ctx).await?;
563
564        if !is_quiet {
565            match format {
566                OutputFormat::Table => {
567                    let colors_enabled = ctx.colors_enabled();
568                    let table = TableBuilder::new(colors_enabled)
569                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
570                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
571                        .status_row(
572                            ["1", &self.nodes.to_string(), "0", "Online"],
573                            StatusType::Success,
574                        );
575                    table.print();
576                }
577                _ => {
578                    #[derive(Serialize)]
579                    struct OpcuaServerInfo {
580                        protocol: String,
581                        endpoint: String,
582                        nodes: usize,
583                        security_mode: String,
584                        namespaces: Vec<NamespaceInfo>,
585                        status: String,
586                    }
587                    #[derive(Serialize)]
588                    struct NamespaceInfo {
589                        index: u32,
590                        nodes: String,
591                        subscriptions: u32,
592                        status: String,
593                    }
594                    let info = OpcuaServerInfo {
595                        protocol: "OPC UA".into(),
596                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
597                        nodes: self.nodes,
598                        security_mode: self.security_mode.clone(),
599                        namespaces: vec![
600                            NamespaceInfo {
601                                index: 0,
602                                nodes: "Standard".into(),
603                                subscriptions: 0,
604                                status: "Ready".into(),
605                            },
606                            NamespaceInfo {
607                                index: 1,
608                                nodes: self.nodes.to_string(),
609                                subscriptions: 0,
610                                status: "Online".into(),
611                            },
612                        ],
613                        status: "Online".into(),
614                    };
615                    let _ = ctx.output().write(&info);
616                }
617            }
618        }
619
620        if !is_quiet {
621            ctx.output().info("Press Ctrl+C to stop");
622        }
623        ctx.shutdown_signal().notified().await;
624
625        self.stop_server(ctx).await?;
626        if !is_quiet {
627            ctx.output().success("OPC UA simulator stopped");
628        }
629
630        Ok(CommandOutput::quiet_success())
631    }
632}
633
634#[async_trait]
635impl ProtocolCommand for OpcuaCommand {
636    fn protocol(&self) -> Protocol {
637        Protocol::OpcUa
638    }
639
640    fn default_port(&self) -> u16 {
641        4840
642    }
643
644    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
645        let output = ctx.output();
646        let spinner = output.spinner("Starting OPC UA server...");
647
648        let config = OpcUaServerConfig {
649            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
650            server_name: "Mabinogion OPC UA Simulator".to_string(),
651            max_subscriptions: 1000,
652            max_monitored_items: 10000,
653            ..Default::default()
654        };
655
656        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
657            crate::error::CliError::ExecutionFailed {
658                message: format!("Failed to create OPC UA server: {}", e),
659            }
660        })?);
661
662        // Add sample nodes with diverse types and mixed read-only / writable access.
663        // Even-indexed nodes are writable, odd-indexed are read-only.
664        let node_count = self.nodes.min(100);
665        for i in 0..node_count {
666            let node_id = format!("ns=2;i={}", 1000 + i);
667            let name = format!("Variable_{}", i);
668            let value = (i as f64) * 0.1;
669
670            if i % 2 == 0 {
671                let _ = server.add_writable_variable(node_id, name, value);
672            } else {
673                let _ = server.add_variable(node_id, name, value);
674            }
675        }
676
677        {
678            let mut server_guard = self.server.lock().await;
679            *server_guard = Some(server.clone());
680        }
681
682        let server_clone = server.clone();
683        let task = tokio::spawn(async move {
684            if let Err(e) = server_clone.start().await {
685                tracing::error!("OPC UA server error: {}", e);
686            }
687        });
688
689        {
690            let mut task_guard = self.server_task.lock().await;
691            *task_guard = Some(task);
692        }
693
694        tokio::time::sleep(Duration::from_millis(100)).await;
695
696        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
697        Ok(())
698    }
699
700    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
701        if let Some(server) = self.server.lock().await.as_ref() {
702            let _ = server.stop().await;
703        }
704
705        if let Some(task) = self.server_task.lock().await.take() {
706            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
707        }
708
709        Ok(())
710    }
711}
712
713// =============================================================================
714// BACnet Command
715// =============================================================================
716
717/// BACnet protocol command.
718pub struct BacnetCommand {
719    bind_addr: SocketAddr,
720    device_instance: u32,
721    objects: usize,
722    bbmd_enabled: bool,
723    /// Device tags.
724    tags: Tags,
725    /// Server instance (for shutdown).
726    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
727    /// Server task handle.
728    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
729}
730
731impl BacnetCommand {
732    pub fn new() -> Self {
733        Self {
734            bind_addr: "0.0.0.0:47808".parse().unwrap(),
735            device_instance: 1234,
736            objects: 0,
737            bbmd_enabled: false,
738            tags: Tags::new(),
739            server: Arc::new(Mutex::new(None)),
740            server_task: Arc::new(Mutex::new(None)),
741        }
742    }
743
744    pub fn with_port(mut self, port: u16) -> Self {
745        self.bind_addr.set_port(port);
746        self
747    }
748
749    pub fn with_device_instance(mut self, instance: u32) -> Self {
750        self.device_instance = instance;
751        self
752    }
753
754    pub fn with_objects(mut self, objects: usize) -> Self {
755        self.objects = objects;
756        self
757    }
758
759    pub fn with_bbmd(mut self, enabled: bool) -> Self {
760        self.bbmd_enabled = enabled;
761        self
762    }
763
764    pub fn with_tags(mut self, tags: Tags) -> Self {
765        self.tags = tags;
766        self
767    }
768}
769
770impl Default for BacnetCommand {
771    fn default() -> Self {
772        Self::new()
773    }
774}
775
776#[async_trait]
777impl Command for BacnetCommand {
778    fn name(&self) -> &str {
779        "bacnet"
780    }
781
782    fn description(&self) -> &str {
783        "Start a BACnet/IP simulator"
784    }
785
786    fn requires_engine(&self) -> bool {
787        true
788    }
789
790    fn supports_shutdown(&self) -> bool {
791        true
792    }
793
794    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
795        let format = ctx.output().format();
796        let is_quiet = ctx.is_quiet();
797        let is_verbose = ctx.is_verbose();
798        let is_debug = ctx.is_debug();
799
800        if !is_quiet {
801            if matches!(format, OutputFormat::Table) {
802                let output = ctx.output();
803                output.header("BACnet/IP Simulator");
804                output.kv("Bind Address", self.bind_addr);
805                output.kv("Device Instance", self.device_instance);
806                output.kv("Objects", self.objects);
807                output.kv(
808                    "BBMD",
809                    if self.bbmd_enabled {
810                        "Enabled"
811                    } else {
812                        "Disabled"
813                    },
814                );
815            }
816        }
817
818        // Verbose: show extra details
819        if is_verbose {
820            let per_type = if self.objects > 0 {
821                std::cmp::max(1, self.objects / 4)
822            } else {
823                0
824            };
825            if per_type == 0 {
826                ctx.vprintln("  Demo Objects: disabled (Device object only)");
827            } else {
828                ctx.vprintln(format!(
829                    "  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})",
830                    per_type, per_type, per_type, per_type, per_type
831                ));
832            }
833            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
834        }
835
836        // Debug: dump full configuration
837        if is_debug {
838            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
839            ctx.dprintln(format!("Device instance: {}", self.device_instance));
840            ctx.dprintln(format!(
841                "Total objects: {}, BBMD: {}",
842                self.objects, self.bbmd_enabled
843            ));
844        }
845
846        self.start_server(ctx).await?;
847
848        let per_type = if self.objects > 0 {
849            std::cmp::max(1, self.objects / 4)
850        } else {
851            0
852        };
853
854        if !is_quiet {
855            match format {
856                OutputFormat::Table => {
857                    let colors_enabled = ctx.colors_enabled();
858                    let demo_status = if per_type == 0 {
859                        "Not created"
860                    } else {
861                        "Active"
862                    };
863                    let table = TableBuilder::new(colors_enabled)
864                        .header(["Object Type", "Count", "Status"])
865                        .status_row(["Device", "1", "Online"], StatusType::Success)
866                        .status_row(
867                            ["Analog Input", &per_type.to_string(), demo_status],
868                            StatusType::Success,
869                        )
870                        .status_row(
871                            ["Analog Output", &per_type.to_string(), demo_status],
872                            StatusType::Success,
873                        )
874                        .status_row(
875                            ["Binary Input", &per_type.to_string(), demo_status],
876                            StatusType::Success,
877                        )
878                        .status_row(
879                            ["Binary Output", &per_type.to_string(), demo_status],
880                            StatusType::Success,
881                        );
882                    table.print();
883                }
884                _ => {
885                    #[derive(Serialize)]
886                    struct BacnetServerInfo {
887                        protocol: String,
888                        bind_address: String,
889                        device_instance: u32,
890                        objects: usize,
891                        bbmd_enabled: bool,
892                        object_types: Vec<ObjectTypeInfo>,
893                        status: String,
894                    }
895                    #[derive(Serialize)]
896                    struct ObjectTypeInfo {
897                        object_type: String,
898                        count: usize,
899                        status: String,
900                    }
901                    let info = BacnetServerInfo {
902                        protocol: "BACnet/IP".into(),
903                        bind_address: self.bind_addr.to_string(),
904                        device_instance: self.device_instance,
905                        objects: self.objects,
906                        bbmd_enabled: self.bbmd_enabled,
907                        object_types: vec![
908                            ObjectTypeInfo {
909                                object_type: "Device".into(),
910                                count: 1,
911                                status: "Online".into(),
912                            },
913                            ObjectTypeInfo {
914                                object_type: "Analog Input".into(),
915                                count: per_type,
916                                status: if per_type == 0 {
917                                    "Not created".into()
918                                } else {
919                                    "Active".into()
920                                },
921                            },
922                            ObjectTypeInfo {
923                                object_type: "Analog Output".into(),
924                                count: per_type,
925                                status: if per_type == 0 {
926                                    "Not created".into()
927                                } else {
928                                    "Active".into()
929                                },
930                            },
931                            ObjectTypeInfo {
932                                object_type: "Binary Input".into(),
933                                count: per_type,
934                                status: if per_type == 0 {
935                                    "Not created".into()
936                                } else {
937                                    "Active".into()
938                                },
939                            },
940                            ObjectTypeInfo {
941                                object_type: "Binary Output".into(),
942                                count: per_type,
943                                status: if per_type == 0 {
944                                    "Not created".into()
945                                } else {
946                                    "Active".into()
947                                },
948                            },
949                        ],
950                        status: "Online".into(),
951                    };
952                    let _ = ctx.output().write(&info);
953                }
954            }
955        }
956
957        if !is_quiet {
958            ctx.output().info("Press Ctrl+C to stop");
959        }
960        ctx.shutdown_signal().notified().await;
961
962        self.stop_server(ctx).await?;
963        if !is_quiet {
964            ctx.output().success("BACnet simulator stopped");
965        }
966
967        Ok(CommandOutput::quiet_success())
968    }
969}
970
971#[async_trait]
972impl ProtocolCommand for BacnetCommand {
973    fn protocol(&self) -> Protocol {
974        Protocol::BacnetIp
975    }
976
977    fn default_port(&self) -> u16 {
978        47808
979    }
980
981    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
982        let output = ctx.output();
983        let spinner = output.spinner("Starting BACnet server...");
984
985        let config = BacnetServerConfig::new(self.device_instance)
986            .with_bind_addr(self.bind_addr)
987            .with_device_name("Mabinogion BACnet Simulator");
988
989        let registry = ObjectRegistry::new();
990
991        if self.objects > 0 {
992            let descriptors = default_object_descriptors();
993            let objects_per_type = std::cmp::max(1, self.objects / descriptors.len());
994            registry.populate_standard_objects(&descriptors, objects_per_type);
995        }
996
997        let server = Arc::new(BACnetServer::new(config, registry));
998
999        {
1000            let mut server_guard = self.server.lock().await;
1001            *server_guard = Some(server.clone());
1002        }
1003
1004        let server_clone = server.clone();
1005        let task = tokio::spawn(async move {
1006            if let Err(e) = server_clone.run().await {
1007                tracing::error!("BACnet server error: {}", e);
1008            }
1009        });
1010
1011        {
1012            let mut task_guard = self.server_task.lock().await;
1013            *task_guard = Some(task);
1014        }
1015
1016        tokio::time::sleep(Duration::from_millis(100)).await;
1017
1018        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
1019        Ok(())
1020    }
1021
1022    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1023        if let Some(server) = self.server.lock().await.as_ref() {
1024            server.shutdown();
1025        }
1026
1027        if let Some(task) = self.server_task.lock().await.take() {
1028            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1029        }
1030
1031        Ok(())
1032    }
1033}
1034
1035// =============================================================================
1036// KNX Command
1037// =============================================================================
1038
1039/// KNX protocol command.
1040pub struct KnxCommand {
1041    bind_addr: SocketAddr,
1042    individual_address: String,
1043    group_objects: usize,
1044    /// Device tags.
1045    tags: Tags,
1046    /// Server instance (for shutdown).
1047    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
1048    /// Server task handle.
1049    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
1050}
1051
1052impl KnxCommand {
1053    pub fn new() -> Self {
1054        Self {
1055            bind_addr: "0.0.0.0:3671".parse().unwrap(),
1056            individual_address: "1.1.1".into(),
1057            group_objects: 100,
1058            tags: Tags::new(),
1059            server: Arc::new(Mutex::new(None)),
1060            server_task: Arc::new(Mutex::new(None)),
1061        }
1062    }
1063
1064    pub fn with_port(mut self, port: u16) -> Self {
1065        self.bind_addr.set_port(port);
1066        self
1067    }
1068
1069    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
1070        self.individual_address = addr.into();
1071        self
1072    }
1073
1074    pub fn with_group_objects(mut self, count: usize) -> Self {
1075        self.group_objects = count;
1076        self
1077    }
1078
1079    pub fn with_tags(mut self, tags: Tags) -> Self {
1080        self.tags = tags;
1081        self
1082    }
1083}
1084
1085impl Default for KnxCommand {
1086    fn default() -> Self {
1087        Self::new()
1088    }
1089}
1090
1091#[async_trait]
1092impl Command for KnxCommand {
1093    fn name(&self) -> &str {
1094        "knx"
1095    }
1096
1097    fn description(&self) -> &str {
1098        "Start a KNXnet/IP simulator"
1099    }
1100
1101    fn requires_engine(&self) -> bool {
1102        true
1103    }
1104
1105    fn supports_shutdown(&self) -> bool {
1106        true
1107    }
1108
1109    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
1110        let format = ctx.output().format();
1111        let is_quiet = ctx.is_quiet();
1112        let is_verbose = ctx.is_verbose();
1113        let is_debug = ctx.is_debug();
1114
1115        if !is_quiet {
1116            if matches!(format, OutputFormat::Table) {
1117                let output = ctx.output();
1118                output.header("KNXnet/IP Simulator");
1119                output.kv("Bind Address", self.bind_addr);
1120                output.kv("Individual Address", &self.individual_address);
1121                output.kv("Group Objects", self.group_objects);
1122            }
1123        }
1124
1125        // Verbose: show extra details
1126        if is_verbose {
1127            ctx.vprintln(format!("  Max Connections: 10"));
1128            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
1129        }
1130
1131        // Debug: dump full configuration
1132        if is_debug {
1133            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1134            ctx.dprintln(format!("Individual address: {}", self.individual_address));
1135            ctx.dprintln(format!("Group objects: {}", self.group_objects));
1136        }
1137
1138        self.start_server(ctx).await?;
1139
1140        if !is_quiet {
1141            match format {
1142                OutputFormat::Table => {
1143                    let colors_enabled = ctx.colors_enabled();
1144                    let table = TableBuilder::new(colors_enabled)
1145                        .header(["Service", "Status"])
1146                        .status_row(["Core", "Ready"], StatusType::Success)
1147                        .status_row(["Device Management", "Ready"], StatusType::Success)
1148                        .status_row(["Tunneling", "Ready"], StatusType::Success);
1149                    table.print();
1150                }
1151                _ => {
1152                    #[derive(Serialize)]
1153                    struct KnxServerInfo {
1154                        protocol: String,
1155                        bind_address: String,
1156                        individual_address: String,
1157                        group_objects: usize,
1158                        services: Vec<ServiceInfo>,
1159                        status: String,
1160                    }
1161                    #[derive(Serialize)]
1162                    struct ServiceInfo {
1163                        service: String,
1164                        status: String,
1165                    }
1166                    let info = KnxServerInfo {
1167                        protocol: "KNXnet/IP".into(),
1168                        bind_address: self.bind_addr.to_string(),
1169                        individual_address: self.individual_address.clone(),
1170                        group_objects: self.group_objects,
1171                        services: vec![
1172                            ServiceInfo {
1173                                service: "Core".into(),
1174                                status: "Ready".into(),
1175                            },
1176                            ServiceInfo {
1177                                service: "Device Management".into(),
1178                                status: "Ready".into(),
1179                            },
1180                            ServiceInfo {
1181                                service: "Tunneling".into(),
1182                                status: "Ready".into(),
1183                            },
1184                        ],
1185                        status: "Online".into(),
1186                    };
1187                    let _ = ctx.output().write(&info);
1188                }
1189            }
1190        }
1191
1192        if !is_quiet {
1193            ctx.output().info("Press Ctrl+C to stop");
1194        }
1195        ctx.shutdown_signal().notified().await;
1196
1197        self.stop_server(ctx).await?;
1198        if !is_quiet {
1199            ctx.output().success("KNX simulator stopped");
1200        }
1201
1202        Ok(CommandOutput::quiet_success())
1203    }
1204}
1205
1206#[async_trait]
1207impl ProtocolCommand for KnxCommand {
1208    fn protocol(&self) -> Protocol {
1209        Protocol::KnxIp
1210    }
1211
1212    fn default_port(&self) -> u16 {
1213        3671
1214    }
1215
1216    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1217        let output = ctx.output();
1218        let spinner = output.spinner("Starting KNX server...");
1219
1220        // Parse individual address
1221        let individual_address: IndividualAddress =
1222            self.individual_address.parse().map_err(|_| {
1223                crate::error::CliError::ExecutionFailed {
1224                    message: format!("Invalid individual address: {}", self.individual_address),
1225                }
1226            })?;
1227
1228        let config = KnxServerConfig {
1229            bind_addr: self.bind_addr,
1230            individual_address,
1231            max_connections: 256,
1232            ..Default::default()
1233        };
1234
1235        // Create group objects based on --groups parameter
1236        let group_table = Arc::new(GroupObjectTable::new());
1237        let dpt_types = [
1238            DptId::new(1, 1),   // Switch (bool)
1239            DptId::new(5, 1),   // Scaling (0-100%)
1240            DptId::new(9, 1),   // Temperature (float16)
1241            DptId::new(9, 4),   // Lux
1242            DptId::new(9, 7),   // Humidity
1243            DptId::new(12, 1),  // Counter (u32)
1244            DptId::new(13, 1),  // Counter signed (i32)
1245            DptId::new(14, 56), // Float (f32)
1246        ];
1247        let dpt_names = [
1248            "Switch",
1249            "Scaling",
1250            "Temperature",
1251            "Lux",
1252            "Humidity",
1253            "Counter",
1254            "SignedCounter",
1255            "Float",
1256        ];
1257
1258        for i in 0..self.group_objects {
1259            let main = ((i / 256) + 1) as u8;
1260            let middle = ((i / 8) % 8) as u8;
1261            let sub = (i % 256) as u8;
1262            let addr = GroupAddress::three_level(main, middle, sub);
1263            let dpt_idx = i % dpt_types.len();
1264            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1265            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1266                tracing::warn!("Failed to create group object {}: {}", i, e);
1267            }
1268        }
1269
1270        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1271
1272        {
1273            let mut server_guard = self.server.lock().await;
1274            *server_guard = Some(server.clone());
1275        }
1276
1277        let server_clone = server.clone();
1278        let task = tokio::spawn(async move {
1279            if let Err(e) = server_clone.start().await {
1280                tracing::error!("KNX server error: {}", e);
1281            }
1282        });
1283
1284        {
1285            let mut task_guard = self.server_task.lock().await;
1286            *task_guard = Some(task);
1287        }
1288
1289        tokio::time::sleep(Duration::from_millis(100)).await;
1290
1291        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1292        Ok(())
1293    }
1294
1295    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1296        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1297        let server_opt = self.server.lock().await.take();
1298        if let Some(server) = server_opt {
1299            // Use spawn_blocking to handle the non-Send future
1300            let _ = tokio::task::spawn_blocking(move || {
1301                let rt = tokio::runtime::Handle::current();
1302                rt.block_on(async {
1303                    let _ = server.stop().await;
1304                })
1305            })
1306            .await;
1307        }
1308
1309        if let Some(task) = self.server_task.lock().await.take() {
1310            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1311        }
1312
1313        Ok(())
1314    }
1315}