Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_bacnet::prelude::{
21    default_object_descriptors, BACnetServer, ObjectRegistry, ServerConfig as BacnetServerConfig,
22};
23use mabi_knx::{
24    DptId, GroupAddress, GroupObjectTable, IndividualAddress, KnxServer, KnxServerConfig,
25};
26use mabi_modbus::{tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig, ModbusTcpServerV2};
27use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
28
29/// Checks if a port is already in use and provides diagnostics.
30///
31/// This is an advisory check — it warns but does not block. If the port is held by
32/// a zombie/suspended process, it provides specific diagnostic instructions.
33async fn check_port_availability(addr: SocketAddr) {
34    use tokio::io::{AsyncReadExt, AsyncWriteExt};
35    use tokio::net::TcpStream;
36
37    // Try to connect to the port
38    let connect_result =
39        tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
40
41    match connect_result {
42        Ok(Ok(_first_stream)) => {
43            // Port is in use — something is listening. Try a Modbus probe.
44            drop(_first_stream);
45
46            let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
47                let mut stream = TcpStream::connect(addr).await?;
48                // Send Modbus TCP ReadHoldingRegisters: txn=1, proto=0, len=6, unit=1, fc=3, addr=0, count=1
49                let request: [u8; 12] = [
50                    0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
51                ];
52                stream.write_all(&request).await?;
53                let mut response = [0u8; 32];
54                let n = stream.read(&mut response).await?;
55                Ok::<_, std::io::Error>(n)
56            })
57            .await;
58
59            match probe_result {
60                Ok(Ok(n)) if n >= 7 => {
61                    tracing::warn!(
62                        port = addr.port(),
63                        "Port {} is already in use by a responding Modbus server. \
64                         The new server will fail to bind.",
65                        addr.port()
66                    );
67                }
68                _ => {
69                    // TCP connects but Modbus doesn't respond — possible zombie
70                    tracing::warn!(
71                        port = addr.port(),
72                        "Port {} is in use: TCP connects but no Modbus response. \
73                         This may be a suspended (zombie) process holding the port.\n  \
74                         Diagnostic: lsof -i :{} | grep LISTEN\n  \
75                         To kill:    kill $(lsof -ti :{} -sTCP:LISTEN)",
76                        addr.port(),
77                        addr.port(),
78                        addr.port()
79                    );
80                }
81            }
82        }
83        Ok(Err(_)) | Err(_) => {
84            // Port is available (connection refused or timeout) — good
85            tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
86        }
87    }
88}
89
90/// Base trait for protocol-specific commands.
91#[async_trait]
92pub trait ProtocolCommand: Command {
93    /// Get the protocol type.
94    fn protocol(&self) -> Protocol;
95
96    /// Get the default port.
97    fn default_port(&self) -> u16;
98
99    /// Start the protocol server.
100    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
101
102    /// Stop the protocol server.
103    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
104}
105
106// =============================================================================
107// Modbus Command
108// =============================================================================
109
110/// Modbus protocol command.
111pub struct ModbusCommand {
112    /// Binding address.
113    bind_addr: SocketAddr,
114    /// Number of devices to simulate.
115    devices: usize,
116    /// Points per device.
117    points_per_device: usize,
118    /// Use RTU mode instead of TCP.
119    rtu_mode: bool,
120    /// Serial port for RTU mode.
121    serial_port: Option<String>,
122    /// Device tags.
123    tags: Tags,
124    /// Server instance (for shutdown).
125    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
126    /// Server task handle.
127    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
128}
129
130impl ModbusCommand {
131    pub fn new() -> Self {
132        Self {
133            bind_addr: "0.0.0.0:502".parse().unwrap(),
134            devices: 1,
135            points_per_device: 100,
136            rtu_mode: false,
137            serial_port: None,
138            tags: Tags::new(),
139            server: Arc::new(Mutex::new(None)),
140            server_task: Arc::new(Mutex::new(None)),
141        }
142    }
143
144    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
145        self.bind_addr = addr;
146        self
147    }
148
149    pub fn with_port(mut self, port: u16) -> Self {
150        self.bind_addr.set_port(port);
151        self
152    }
153
154    pub fn with_devices(mut self, devices: usize) -> Self {
155        self.devices = devices;
156        self
157    }
158
159    pub fn with_points(mut self, points: usize) -> Self {
160        self.points_per_device = points;
161        self
162    }
163
164    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
165        self.rtu_mode = true;
166        self.serial_port = Some(serial_port.into());
167        self
168    }
169
170    pub fn with_tags(mut self, tags: Tags) -> Self {
171        self.tags = tags;
172        self
173    }
174}
175
176impl Default for ModbusCommand {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182#[async_trait]
183impl Command for ModbusCommand {
184    fn name(&self) -> &str {
185        "modbus"
186    }
187
188    fn description(&self) -> &str {
189        "Start a Modbus TCP/RTU simulator"
190    }
191
192    fn requires_engine(&self) -> bool {
193        true
194    }
195
196    fn supports_shutdown(&self) -> bool {
197        true
198    }
199
200    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
201        let format = ctx.output().format();
202        let is_quiet = ctx.is_quiet();
203        let is_verbose = ctx.is_verbose();
204        let is_debug = ctx.is_debug();
205
206        if !is_quiet {
207            if matches!(format, OutputFormat::Table) {
208                let output = ctx.output();
209                if self.rtu_mode {
210                    output.header("Modbus RTU Simulator");
211                    output.kv("Serial Port", self.serial_port.as_deref().unwrap_or("N/A"));
212                } else {
213                    output.header("Modbus TCP Simulator");
214                    output.kv("Bind Address", self.bind_addr);
215                }
216                output.kv("Devices", self.devices);
217                output.kv("Points per Device", self.points_per_device);
218                output.kv("Total Points", self.devices * self.points_per_device);
219            }
220        }
221
222        // Verbose: show extra configuration details
223        if is_verbose {
224            ctx.vprintln(format!(
225                "  Protocol Mode: {}",
226                if self.rtu_mode { "RTU" } else { "TCP" }
227            ));
228            ctx.vprintln(format!(
229                "  Points Distribution: {} per register type",
230                self.points_per_device / 4
231            ));
232        }
233
234        // Debug: dump full configuration
235        if is_debug {
236            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
237            ctx.dprintln(format!(
238                "RTU mode: {}, Serial: {:?}",
239                self.rtu_mode, self.serial_port
240            ));
241            ctx.dprintln(format!(
242                "Devices: {}, Points/device: {}",
243                self.devices, self.points_per_device
244            ));
245        }
246
247        self.start_server(ctx).await?;
248
249        let points_per_type = self.points_per_device / 4;
250
251        if !is_quiet {
252            match format {
253                OutputFormat::Table => {
254                    let colors_enabled = ctx.colors_enabled();
255                    let builder = TableBuilder::new(colors_enabled).header([
256                        "Unit ID",
257                        "Holding Regs",
258                        "Input Regs",
259                        "Coils",
260                        "Discrete",
261                        "Status",
262                    ]);
263
264                    let devices = self.devices;
265                    let pts = points_per_type.to_string();
266                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
267                        let unit_id = (i + 1).to_string();
268                        (
269                            vec![
270                                unit_id,
271                                pts.clone(),
272                                pts.clone(),
273                                pts.clone(),
274                                pts.clone(),
275                                "Online".into(),
276                            ],
277                            StatusType::Success,
278                        )
279                    });
280                    table.print();
281                }
282                _ => {
283                    #[derive(Serialize)]
284                    struct ModbusServerInfo {
285                        protocol: String,
286                        bind_address: String,
287                        devices: usize,
288                        points_per_device: usize,
289                        total_points: usize,
290                        rtu_mode: bool,
291                        serial_port: Option<String>,
292                        device_list: Vec<ModbusDeviceInfo>,
293                        status: String,
294                    }
295                    #[derive(Serialize)]
296                    struct ModbusDeviceInfo {
297                        unit_id: usize,
298                        holding_registers: usize,
299                        input_registers: usize,
300                        coils: usize,
301                        discrete_inputs: usize,
302                        status: String,
303                    }
304                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
305                        .map(|i| ModbusDeviceInfo {
306                            unit_id: i + 1,
307                            holding_registers: points_per_type,
308                            input_registers: points_per_type,
309                            coils: points_per_type,
310                            discrete_inputs: points_per_type,
311                            status: "Online".into(),
312                        })
313                        .collect();
314                    let info = ModbusServerInfo {
315                        protocol: if self.rtu_mode {
316                            "Modbus RTU".into()
317                        } else {
318                            "Modbus TCP".into()
319                        },
320                        bind_address: self.bind_addr.to_string(),
321                        devices: self.devices,
322                        points_per_device: self.points_per_device,
323                        total_points: self.devices * self.points_per_device,
324                        rtu_mode: self.rtu_mode,
325                        serial_port: self.serial_port.clone(),
326                        device_list,
327                        status: "Online".into(),
328                    };
329                    let _ = ctx.output().write(&info);
330                }
331            }
332        }
333
334        if !is_quiet {
335            ctx.output().info("Press Ctrl+C to stop");
336        }
337        ctx.shutdown_signal().notified().await;
338
339        self.stop_server(ctx).await?;
340        if !is_quiet {
341            ctx.output().success("Modbus simulator stopped");
342        }
343
344        Ok(CommandOutput::quiet_success())
345    }
346}
347
348#[async_trait]
349impl ProtocolCommand for ModbusCommand {
350    fn protocol(&self) -> Protocol {
351        if self.rtu_mode {
352            Protocol::ModbusRtu
353        } else {
354            Protocol::ModbusTcp
355        }
356    }
357
358    fn default_port(&self) -> u16 {
359        502
360    }
361
362    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363        let output = ctx.output();
364
365        // Advisory port availability check before starting
366        check_port_availability(self.bind_addr).await;
367
368        let spinner = output.spinner("Starting Modbus server...");
369
370        let config = ServerConfigV2 {
371            bind_address: self.bind_addr,
372            ..Default::default()
373        };
374
375        let server = Arc::new(ModbusTcpServerV2::new(config));
376
377        for i in 0..self.devices {
378            let unit_id = (i + 1) as u8;
379            let points = (self.points_per_device / 4) as u16;
380            let device_config = ModbusDeviceConfig {
381                unit_id,
382                name: format!("Device-{}", unit_id),
383                holding_registers: points,
384                input_registers: points,
385                coils: points,
386                discrete_inputs: points,
387                response_delay_ms: 0,
388                tags: self.tags.clone(),
389            };
390            let device = ModbusDevice::new(device_config);
391            server.add_device(device);
392        }
393
394        {
395            let mut server_guard = self.server.lock().await;
396            *server_guard = Some(server.clone());
397        }
398
399        let server_clone = server.clone();
400        let task = tokio::spawn(async move {
401            if let Err(e) = server_clone.run().await {
402                tracing::error!("Modbus server error: {}", e);
403            }
404        });
405
406        {
407            let mut task_guard = self.server_task.lock().await;
408            *task_guard = Some(task);
409        }
410
411        tokio::time::sleep(Duration::from_millis(100)).await;
412
413        // Check if server task exited early (likely a bind failure / port-in-use)
414        {
415            let task_guard = self.server_task.lock().await;
416            if let Some(task) = task_guard.as_ref() {
417                if task.is_finished() {
418                    spinner.finish_with_message("Failed to start server");
419                    return Err(crate::error::CliError::PortInUse {
420                        port: self.bind_addr.port(),
421                    });
422                }
423            }
424        }
425
426        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
427        Ok(())
428    }
429
430    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
431        if let Some(server) = self.server.lock().await.as_ref() {
432            server.shutdown();
433        }
434
435        if let Some(task) = self.server_task.lock().await.take() {
436            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
437        }
438
439        Ok(())
440    }
441}
442
443// =============================================================================
444// OPC UA Command
445// =============================================================================
446
447/// OPC UA protocol command.
448pub struct OpcuaCommand {
449    bind_addr: SocketAddr,
450    endpoint_path: String,
451    nodes: usize,
452    security_mode: String,
453    /// Device tags.
454    tags: Tags,
455    /// Server instance (for shutdown).
456    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
457    /// Server task handle.
458    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
459}
460
461impl OpcuaCommand {
462    pub fn new() -> Self {
463        Self {
464            bind_addr: "0.0.0.0:4840".parse().unwrap(),
465            endpoint_path: "/".into(),
466            nodes: 1000,
467            security_mode: "None".into(),
468            tags: Tags::new(),
469            server: Arc::new(Mutex::new(None)),
470            server_task: Arc::new(Mutex::new(None)),
471        }
472    }
473
474    pub fn with_port(mut self, port: u16) -> Self {
475        self.bind_addr.set_port(port);
476        self
477    }
478
479    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
480        self.endpoint_path = path.into();
481        self
482    }
483
484    pub fn with_nodes(mut self, nodes: usize) -> Self {
485        self.nodes = nodes;
486        self
487    }
488
489    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
490        self.security_mode = mode.into();
491        self
492    }
493
494    pub fn with_tags(mut self, tags: Tags) -> Self {
495        self.tags = tags;
496        self
497    }
498}
499
500impl Default for OpcuaCommand {
501    fn default() -> Self {
502        Self::new()
503    }
504}
505
506#[async_trait]
507impl Command for OpcuaCommand {
508    fn name(&self) -> &str {
509        "opcua"
510    }
511
512    fn description(&self) -> &str {
513        "Start an OPC UA server simulator"
514    }
515
516    fn requires_engine(&self) -> bool {
517        true
518    }
519
520    fn supports_shutdown(&self) -> bool {
521        true
522    }
523
524    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
525        let format = ctx.output().format();
526        let is_quiet = ctx.is_quiet();
527        let is_verbose = ctx.is_verbose();
528        let is_debug = ctx.is_debug();
529
530        if !is_quiet {
531            if matches!(format, OutputFormat::Table) {
532                let output = ctx.output();
533                output.header("OPC UA Simulator");
534                output.kv(
535                    "Endpoint",
536                    format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
537                );
538                output.kv("Nodes", self.nodes);
539                output.kv("Security Mode", &self.security_mode);
540            }
541        }
542
543        // Verbose: show extra details
544        if is_verbose {
545            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
546            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
547            ctx.vprintln(format!("  Max Subscriptions: 1000"));
548            ctx.vprintln(format!("  Max Monitored Items: 10000"));
549        }
550
551        // Debug: dump full configuration
552        if is_debug {
553            ctx.dprintln(format!(
554                "Full endpoint URL: opc.tcp://{}{}",
555                self.bind_addr, self.endpoint_path
556            ));
557            ctx.dprintln(format!("Node count: {}", self.nodes));
558            ctx.dprintln(format!("Security mode: {}", self.security_mode));
559            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
560        }
561
562        self.start_server(ctx).await?;
563
564        if !is_quiet {
565            match format {
566                OutputFormat::Table => {
567                    let colors_enabled = ctx.colors_enabled();
568                    let table = TableBuilder::new(colors_enabled)
569                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
570                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
571                        .status_row(
572                            ["1", &self.nodes.to_string(), "0", "Online"],
573                            StatusType::Success,
574                        );
575                    table.print();
576                }
577                _ => {
578                    #[derive(Serialize)]
579                    struct OpcuaServerInfo {
580                        protocol: String,
581                        endpoint: String,
582                        nodes: usize,
583                        security_mode: String,
584                        namespaces: Vec<NamespaceInfo>,
585                        status: String,
586                    }
587                    #[derive(Serialize)]
588                    struct NamespaceInfo {
589                        index: u32,
590                        nodes: String,
591                        subscriptions: u32,
592                        status: String,
593                    }
594                    let info = OpcuaServerInfo {
595                        protocol: "OPC UA".into(),
596                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
597                        nodes: self.nodes,
598                        security_mode: self.security_mode.clone(),
599                        namespaces: vec![
600                            NamespaceInfo {
601                                index: 0,
602                                nodes: "Standard".into(),
603                                subscriptions: 0,
604                                status: "Ready".into(),
605                            },
606                            NamespaceInfo {
607                                index: 1,
608                                nodes: self.nodes.to_string(),
609                                subscriptions: 0,
610                                status: "Online".into(),
611                            },
612                        ],
613                        status: "Online".into(),
614                    };
615                    let _ = ctx.output().write(&info);
616                }
617            }
618        }
619
620        if !is_quiet {
621            ctx.output().info("Press Ctrl+C to stop");
622        }
623        ctx.shutdown_signal().notified().await;
624
625        self.stop_server(ctx).await?;
626        if !is_quiet {
627            ctx.output().success("OPC UA simulator stopped");
628        }
629
630        Ok(CommandOutput::quiet_success())
631    }
632}
633
634#[async_trait]
635impl ProtocolCommand for OpcuaCommand {
636    fn protocol(&self) -> Protocol {
637        Protocol::OpcUa
638    }
639
640    fn default_port(&self) -> u16 {
641        4840
642    }
643
644    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
645        let output = ctx.output();
646        let spinner = output.spinner("Starting OPC UA server...");
647
648        let config = OpcUaServerConfig {
649            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
650            server_name: "Mabinogion OPC UA Simulator".to_string(),
651            max_subscriptions: 1000,
652            max_monitored_items: 10000,
653            ..Default::default()
654        };
655
656        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
657            crate::error::CliError::ExecutionFailed {
658                message: format!("Failed to create OPC UA server: {}", e),
659            }
660        })?);
661
662        // Add sample nodes with diverse types and mixed read-only / writable access.
663        // Even-indexed nodes are writable, odd-indexed are read-only.
664        let node_count = self.nodes.min(100);
665        for i in 0..node_count {
666            let node_id = format!("ns=2;i={}", 1000 + i);
667            let name = format!("Variable_{}", i);
668            let value = (i as f64) * 0.1;
669
670            if i % 2 == 0 {
671                let _ = server.add_writable_variable(node_id, name, value);
672            } else {
673                let _ = server.add_variable(node_id, name, value);
674            }
675        }
676
677        {
678            let mut server_guard = self.server.lock().await;
679            *server_guard = Some(server.clone());
680        }
681
682        let server_clone = server.clone();
683        let task = tokio::spawn(async move {
684            if let Err(e) = server_clone.start().await {
685                tracing::error!("OPC UA server error: {}", e);
686            }
687        });
688
689        {
690            let mut task_guard = self.server_task.lock().await;
691            *task_guard = Some(task);
692        }
693
694        tokio::time::sleep(Duration::from_millis(100)).await;
695
696        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
697        Ok(())
698    }
699
700    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
701        if let Some(server) = self.server.lock().await.as_ref() {
702            let _ = server.stop().await;
703        }
704
705        if let Some(task) = self.server_task.lock().await.take() {
706            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
707        }
708
709        Ok(())
710    }
711}
712
713// =============================================================================
714// BACnet Command
715// =============================================================================
716
717/// BACnet protocol command.
718pub struct BacnetCommand {
719    bind_addr: SocketAddr,
720    device_instance: u32,
721    objects: usize,
722    bbmd_enabled: bool,
723    /// Device tags.
724    tags: Tags,
725    /// Server instance (for shutdown).
726    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
727    /// Server task handle.
728    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
729}
730
731impl BacnetCommand {
732    pub fn new() -> Self {
733        Self {
734            bind_addr: "0.0.0.0:47808".parse().unwrap(),
735            device_instance: 1234,
736            objects: 100,
737            bbmd_enabled: false,
738            tags: Tags::new(),
739            server: Arc::new(Mutex::new(None)),
740            server_task: Arc::new(Mutex::new(None)),
741        }
742    }
743
744    pub fn with_port(mut self, port: u16) -> Self {
745        self.bind_addr.set_port(port);
746        self
747    }
748
749    pub fn with_device_instance(mut self, instance: u32) -> Self {
750        self.device_instance = instance;
751        self
752    }
753
754    pub fn with_objects(mut self, objects: usize) -> Self {
755        self.objects = objects;
756        self
757    }
758
759    pub fn with_bbmd(mut self, enabled: bool) -> Self {
760        self.bbmd_enabled = enabled;
761        self
762    }
763
764    pub fn with_tags(mut self, tags: Tags) -> Self {
765        self.tags = tags;
766        self
767    }
768}
769
770impl Default for BacnetCommand {
771    fn default() -> Self {
772        Self::new()
773    }
774}
775
776#[async_trait]
777impl Command for BacnetCommand {
778    fn name(&self) -> &str {
779        "bacnet"
780    }
781
782    fn description(&self) -> &str {
783        "Start a BACnet/IP simulator"
784    }
785
786    fn requires_engine(&self) -> bool {
787        true
788    }
789
790    fn supports_shutdown(&self) -> bool {
791        true
792    }
793
794    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
795        let format = ctx.output().format();
796        let is_quiet = ctx.is_quiet();
797        let is_verbose = ctx.is_verbose();
798        let is_debug = ctx.is_debug();
799
800        if !is_quiet {
801            if matches!(format, OutputFormat::Table) {
802                let output = ctx.output();
803                output.header("BACnet/IP Simulator");
804                output.kv("Bind Address", self.bind_addr);
805                output.kv("Device Instance", self.device_instance);
806                output.kv("Objects", self.objects);
807                output.kv(
808                    "BBMD",
809                    if self.bbmd_enabled {
810                        "Enabled"
811                    } else {
812                        "Disabled"
813                    },
814                );
815            }
816        }
817
818        // Verbose: show extra details
819        if is_verbose {
820            let per_type = self.objects / 4;
821            ctx.vprintln(format!(
822                "  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})",
823                per_type, per_type, per_type, per_type, per_type
824            ));
825            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
826        }
827
828        // Debug: dump full configuration
829        if is_debug {
830            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
831            ctx.dprintln(format!("Device instance: {}", self.device_instance));
832            ctx.dprintln(format!(
833                "Total objects: {}, BBMD: {}",
834                self.objects, self.bbmd_enabled
835            ));
836        }
837
838        self.start_server(ctx).await?;
839
840        let per_type = self.objects / 4;
841
842        if !is_quiet {
843            match format {
844                OutputFormat::Table => {
845                    let colors_enabled = ctx.colors_enabled();
846                    let table = TableBuilder::new(colors_enabled)
847                        .header(["Object Type", "Count", "Status"])
848                        .status_row(["Device", "1", "Online"], StatusType::Success)
849                        .status_row(
850                            ["Analog Input", &per_type.to_string(), "Active"],
851                            StatusType::Success,
852                        )
853                        .status_row(
854                            ["Analog Output", &per_type.to_string(), "Active"],
855                            StatusType::Success,
856                        )
857                        .status_row(
858                            ["Binary Input", &per_type.to_string(), "Active"],
859                            StatusType::Success,
860                        )
861                        .status_row(
862                            ["Binary Output", &per_type.to_string(), "Active"],
863                            StatusType::Success,
864                        );
865                    table.print();
866                }
867                _ => {
868                    #[derive(Serialize)]
869                    struct BacnetServerInfo {
870                        protocol: String,
871                        bind_address: String,
872                        device_instance: u32,
873                        objects: usize,
874                        bbmd_enabled: bool,
875                        object_types: Vec<ObjectTypeInfo>,
876                        status: String,
877                    }
878                    #[derive(Serialize)]
879                    struct ObjectTypeInfo {
880                        object_type: String,
881                        count: usize,
882                        status: String,
883                    }
884                    let info = BacnetServerInfo {
885                        protocol: "BACnet/IP".into(),
886                        bind_address: self.bind_addr.to_string(),
887                        device_instance: self.device_instance,
888                        objects: self.objects,
889                        bbmd_enabled: self.bbmd_enabled,
890                        object_types: vec![
891                            ObjectTypeInfo {
892                                object_type: "Device".into(),
893                                count: 1,
894                                status: "Online".into(),
895                            },
896                            ObjectTypeInfo {
897                                object_type: "Analog Input".into(),
898                                count: per_type,
899                                status: "Active".into(),
900                            },
901                            ObjectTypeInfo {
902                                object_type: "Analog Output".into(),
903                                count: per_type,
904                                status: "Active".into(),
905                            },
906                            ObjectTypeInfo {
907                                object_type: "Binary Input".into(),
908                                count: per_type,
909                                status: "Active".into(),
910                            },
911                            ObjectTypeInfo {
912                                object_type: "Binary Output".into(),
913                                count: per_type,
914                                status: "Active".into(),
915                            },
916                        ],
917                        status: "Online".into(),
918                    };
919                    let _ = ctx.output().write(&info);
920                }
921            }
922        }
923
924        if !is_quiet {
925            ctx.output().info("Press Ctrl+C to stop");
926        }
927        ctx.shutdown_signal().notified().await;
928
929        self.stop_server(ctx).await?;
930        if !is_quiet {
931            ctx.output().success("BACnet simulator stopped");
932        }
933
934        Ok(CommandOutput::quiet_success())
935    }
936}
937
938#[async_trait]
939impl ProtocolCommand for BacnetCommand {
940    fn protocol(&self) -> Protocol {
941        Protocol::BacnetIp
942    }
943
944    fn default_port(&self) -> u16 {
945        47808
946    }
947
948    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
949        let output = ctx.output();
950        let spinner = output.spinner("Starting BACnet server...");
951
952        let config = BacnetServerConfig::new(self.device_instance)
953            .with_bind_addr(self.bind_addr)
954            .with_device_name("Mabinogion BACnet Simulator");
955
956        // Create object registry with sample objects
957        let registry = ObjectRegistry::new();
958
959        let descriptors = default_object_descriptors();
960        let objects_per_type = self.objects / descriptors.len();
961        registry.populate_standard_objects(&descriptors, objects_per_type);
962
963        let server = Arc::new(BACnetServer::new(config, registry));
964
965        {
966            let mut server_guard = self.server.lock().await;
967            *server_guard = Some(server.clone());
968        }
969
970        let server_clone = server.clone();
971        let task = tokio::spawn(async move {
972            if let Err(e) = server_clone.run().await {
973                tracing::error!("BACnet server error: {}", e);
974            }
975        });
976
977        {
978            let mut task_guard = self.server_task.lock().await;
979            *task_guard = Some(task);
980        }
981
982        tokio::time::sleep(Duration::from_millis(100)).await;
983
984        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
985        Ok(())
986    }
987
988    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
989        if let Some(server) = self.server.lock().await.as_ref() {
990            server.shutdown();
991        }
992
993        if let Some(task) = self.server_task.lock().await.take() {
994            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
995        }
996
997        Ok(())
998    }
999}
1000
1001// =============================================================================
1002// KNX Command
1003// =============================================================================
1004
1005/// KNX protocol command.
1006pub struct KnxCommand {
1007    bind_addr: SocketAddr,
1008    individual_address: String,
1009    group_objects: usize,
1010    /// Device tags.
1011    tags: Tags,
1012    /// Server instance (for shutdown).
1013    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
1014    /// Server task handle.
1015    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
1016}
1017
1018impl KnxCommand {
1019    pub fn new() -> Self {
1020        Self {
1021            bind_addr: "0.0.0.0:3671".parse().unwrap(),
1022            individual_address: "1.1.1".into(),
1023            group_objects: 100,
1024            tags: Tags::new(),
1025            server: Arc::new(Mutex::new(None)),
1026            server_task: Arc::new(Mutex::new(None)),
1027        }
1028    }
1029
1030    pub fn with_port(mut self, port: u16) -> Self {
1031        self.bind_addr.set_port(port);
1032        self
1033    }
1034
1035    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
1036        self.individual_address = addr.into();
1037        self
1038    }
1039
1040    pub fn with_group_objects(mut self, count: usize) -> Self {
1041        self.group_objects = count;
1042        self
1043    }
1044
1045    pub fn with_tags(mut self, tags: Tags) -> Self {
1046        self.tags = tags;
1047        self
1048    }
1049}
1050
1051impl Default for KnxCommand {
1052    fn default() -> Self {
1053        Self::new()
1054    }
1055}
1056
1057#[async_trait]
1058impl Command for KnxCommand {
1059    fn name(&self) -> &str {
1060        "knx"
1061    }
1062
1063    fn description(&self) -> &str {
1064        "Start a KNXnet/IP simulator"
1065    }
1066
1067    fn requires_engine(&self) -> bool {
1068        true
1069    }
1070
1071    fn supports_shutdown(&self) -> bool {
1072        true
1073    }
1074
1075    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
1076        let format = ctx.output().format();
1077        let is_quiet = ctx.is_quiet();
1078        let is_verbose = ctx.is_verbose();
1079        let is_debug = ctx.is_debug();
1080
1081        if !is_quiet {
1082            if matches!(format, OutputFormat::Table) {
1083                let output = ctx.output();
1084                output.header("KNXnet/IP Simulator");
1085                output.kv("Bind Address", self.bind_addr);
1086                output.kv("Individual Address", &self.individual_address);
1087                output.kv("Group Objects", self.group_objects);
1088            }
1089        }
1090
1091        // Verbose: show extra details
1092        if is_verbose {
1093            ctx.vprintln(format!("  Max Connections: 10"));
1094            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
1095        }
1096
1097        // Debug: dump full configuration
1098        if is_debug {
1099            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1100            ctx.dprintln(format!("Individual address: {}", self.individual_address));
1101            ctx.dprintln(format!("Group objects: {}", self.group_objects));
1102        }
1103
1104        self.start_server(ctx).await?;
1105
1106        if !is_quiet {
1107            match format {
1108                OutputFormat::Table => {
1109                    let colors_enabled = ctx.colors_enabled();
1110                    let table = TableBuilder::new(colors_enabled)
1111                        .header(["Service", "Status"])
1112                        .status_row(["Core", "Ready"], StatusType::Success)
1113                        .status_row(["Device Management", "Ready"], StatusType::Success)
1114                        .status_row(["Tunneling", "Ready"], StatusType::Success);
1115                    table.print();
1116                }
1117                _ => {
1118                    #[derive(Serialize)]
1119                    struct KnxServerInfo {
1120                        protocol: String,
1121                        bind_address: String,
1122                        individual_address: String,
1123                        group_objects: usize,
1124                        services: Vec<ServiceInfo>,
1125                        status: String,
1126                    }
1127                    #[derive(Serialize)]
1128                    struct ServiceInfo {
1129                        service: String,
1130                        status: String,
1131                    }
1132                    let info = KnxServerInfo {
1133                        protocol: "KNXnet/IP".into(),
1134                        bind_address: self.bind_addr.to_string(),
1135                        individual_address: self.individual_address.clone(),
1136                        group_objects: self.group_objects,
1137                        services: vec![
1138                            ServiceInfo {
1139                                service: "Core".into(),
1140                                status: "Ready".into(),
1141                            },
1142                            ServiceInfo {
1143                                service: "Device Management".into(),
1144                                status: "Ready".into(),
1145                            },
1146                            ServiceInfo {
1147                                service: "Tunneling".into(),
1148                                status: "Ready".into(),
1149                            },
1150                        ],
1151                        status: "Online".into(),
1152                    };
1153                    let _ = ctx.output().write(&info);
1154                }
1155            }
1156        }
1157
1158        if !is_quiet {
1159            ctx.output().info("Press Ctrl+C to stop");
1160        }
1161        ctx.shutdown_signal().notified().await;
1162
1163        self.stop_server(ctx).await?;
1164        if !is_quiet {
1165            ctx.output().success("KNX simulator stopped");
1166        }
1167
1168        Ok(CommandOutput::quiet_success())
1169    }
1170}
1171
1172#[async_trait]
1173impl ProtocolCommand for KnxCommand {
1174    fn protocol(&self) -> Protocol {
1175        Protocol::KnxIp
1176    }
1177
1178    fn default_port(&self) -> u16 {
1179        3671
1180    }
1181
1182    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1183        let output = ctx.output();
1184        let spinner = output.spinner("Starting KNX server...");
1185
1186        // Parse individual address
1187        let individual_address: IndividualAddress =
1188            self.individual_address.parse().map_err(|_| {
1189                crate::error::CliError::ExecutionFailed {
1190                    message: format!("Invalid individual address: {}", self.individual_address),
1191                }
1192            })?;
1193
1194        let config = KnxServerConfig {
1195            bind_addr: self.bind_addr,
1196            individual_address,
1197            max_connections: 256,
1198            ..Default::default()
1199        };
1200
1201        // Create group objects based on --groups parameter
1202        let group_table = Arc::new(GroupObjectTable::new());
1203        let dpt_types = [
1204            DptId::new(1, 1),   // Switch (bool)
1205            DptId::new(5, 1),   // Scaling (0-100%)
1206            DptId::new(9, 1),   // Temperature (float16)
1207            DptId::new(9, 4),   // Lux
1208            DptId::new(9, 7),   // Humidity
1209            DptId::new(12, 1),  // Counter (u32)
1210            DptId::new(13, 1),  // Counter signed (i32)
1211            DptId::new(14, 56), // Float (f32)
1212        ];
1213        let dpt_names = [
1214            "Switch",
1215            "Scaling",
1216            "Temperature",
1217            "Lux",
1218            "Humidity",
1219            "Counter",
1220            "SignedCounter",
1221            "Float",
1222        ];
1223
1224        for i in 0..self.group_objects {
1225            let main = ((i / 256) + 1) as u8;
1226            let middle = ((i / 8) % 8) as u8;
1227            let sub = (i % 256) as u8;
1228            let addr = GroupAddress::three_level(main, middle, sub);
1229            let dpt_idx = i % dpt_types.len();
1230            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1231            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1232                tracing::warn!("Failed to create group object {}: {}", i, e);
1233            }
1234        }
1235
1236        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1237
1238        {
1239            let mut server_guard = self.server.lock().await;
1240            *server_guard = Some(server.clone());
1241        }
1242
1243        let server_clone = server.clone();
1244        let task = tokio::spawn(async move {
1245            if let Err(e) = server_clone.start().await {
1246                tracing::error!("KNX server error: {}", e);
1247            }
1248        });
1249
1250        {
1251            let mut task_guard = self.server_task.lock().await;
1252            *task_guard = Some(task);
1253        }
1254
1255        tokio::time::sleep(Duration::from_millis(100)).await;
1256
1257        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1258        Ok(())
1259    }
1260
1261    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1262        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1263        let server_opt = self.server.lock().await.take();
1264        if let Some(server) = server_opt {
1265            // Use spawn_blocking to handle the non-Send future
1266            let _ = tokio::task::spawn_blocking(move || {
1267                let rt = tokio::runtime::Handle::current();
1268                rt.block_on(async {
1269                    let _ = server.stop().await;
1270                })
1271            })
1272            .await;
1273        }
1274
1275        if let Some(task) = self.server_task.lock().await.take() {
1276            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1277        }
1278
1279        Ok(())
1280    }
1281}