Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, default_object_descriptors};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25/// Checks if a port is already in use and provides diagnostics.
26///
27/// This is an advisory check — it warns but does not block. If the port is held by
28/// a zombie/suspended process, it provides specific diagnostic instructions.
29async fn check_port_availability(addr: SocketAddr) {
30    use tokio::io::{AsyncReadExt, AsyncWriteExt};
31    use tokio::net::TcpStream;
32
33    // Try to connect to the port
34    let connect_result =
35        tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
36
37    match connect_result {
38        Ok(Ok(_first_stream)) => {
39            // Port is in use — something is listening. Try a Modbus probe.
40            drop(_first_stream);
41
42            let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
43                let mut stream = TcpStream::connect(addr).await?;
44                // Send Modbus TCP ReadHoldingRegisters: txn=1, proto=0, len=6, unit=1, fc=3, addr=0, count=1
45                let request: [u8; 12] = [
46                    0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
47                ];
48                stream.write_all(&request).await?;
49                let mut response = [0u8; 32];
50                let n = stream.read(&mut response).await?;
51                Ok::<_, std::io::Error>(n)
52            })
53            .await;
54
55            match probe_result {
56                Ok(Ok(n)) if n >= 7 => {
57                    tracing::warn!(
58                        port = addr.port(),
59                        "Port {} is already in use by a responding Modbus server. \
60                         The new server will fail to bind.",
61                        addr.port()
62                    );
63                }
64                _ => {
65                    // TCP connects but Modbus doesn't respond — possible zombie
66                    tracing::warn!(
67                        port = addr.port(),
68                        "Port {} is in use: TCP connects but no Modbus response. \
69                         This may be a suspended (zombie) process holding the port.\n  \
70                         Diagnostic: lsof -i :{} | grep LISTEN\n  \
71                         To kill:    kill $(lsof -ti :{} -sTCP:LISTEN)",
72                        addr.port(),
73                        addr.port(),
74                        addr.port()
75                    );
76                }
77            }
78        }
79        Ok(Err(_)) | Err(_) => {
80            // Port is available (connection refused or timeout) — good
81            tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
82        }
83    }
84}
85
86/// Base trait for protocol-specific commands.
87#[async_trait]
88pub trait ProtocolCommand: Command {
89    /// Get the protocol type.
90    fn protocol(&self) -> Protocol;
91
92    /// Get the default port.
93    fn default_port(&self) -> u16;
94
95    /// Start the protocol server.
96    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
97
98    /// Stop the protocol server.
99    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
100}
101
102// =============================================================================
103// Modbus Command
104// =============================================================================
105
106/// Modbus protocol command.
107pub struct ModbusCommand {
108    /// Binding address.
109    bind_addr: SocketAddr,
110    /// Number of devices to simulate.
111    devices: usize,
112    /// Points per device.
113    points_per_device: usize,
114    /// Use RTU mode instead of TCP.
115    rtu_mode: bool,
116    /// Serial port for RTU mode.
117    serial_port: Option<String>,
118    /// Device tags.
119    tags: Tags,
120    /// Server instance (for shutdown).
121    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
122    /// Server task handle.
123    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
124}
125
126impl ModbusCommand {
127    pub fn new() -> Self {
128        Self {
129            bind_addr: "0.0.0.0:502".parse().unwrap(),
130            devices: 1,
131            points_per_device: 100,
132            rtu_mode: false,
133            serial_port: None,
134            tags: Tags::new(),
135            server: Arc::new(Mutex::new(None)),
136            server_task: Arc::new(Mutex::new(None)),
137        }
138    }
139
140    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
141        self.bind_addr = addr;
142        self
143    }
144
145    pub fn with_port(mut self, port: u16) -> Self {
146        self.bind_addr.set_port(port);
147        self
148    }
149
150    pub fn with_devices(mut self, devices: usize) -> Self {
151        self.devices = devices;
152        self
153    }
154
155    pub fn with_points(mut self, points: usize) -> Self {
156        self.points_per_device = points;
157        self
158    }
159
160    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
161        self.rtu_mode = true;
162        self.serial_port = Some(serial_port.into());
163        self
164    }
165
166    pub fn with_tags(mut self, tags: Tags) -> Self {
167        self.tags = tags;
168        self
169    }
170}
171
172impl Default for ModbusCommand {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178#[async_trait]
179impl Command for ModbusCommand {
180    fn name(&self) -> &str {
181        "modbus"
182    }
183
184    fn description(&self) -> &str {
185        "Start a Modbus TCP/RTU simulator"
186    }
187
188    fn requires_engine(&self) -> bool {
189        true
190    }
191
192    fn supports_shutdown(&self) -> bool {
193        true
194    }
195
196    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
197        let format = ctx.output().format();
198        let is_quiet = ctx.is_quiet();
199        let is_verbose = ctx.is_verbose();
200        let is_debug = ctx.is_debug();
201
202        if !is_quiet {
203            if matches!(format, OutputFormat::Table) {
204                let output = ctx.output();
205                if self.rtu_mode {
206                    output.header("Modbus RTU Simulator");
207                    output.kv(
208                        "Serial Port",
209                        self.serial_port.as_deref().unwrap_or("N/A"),
210                    );
211                } else {
212                    output.header("Modbus TCP Simulator");
213                    output.kv("Bind Address", self.bind_addr);
214                }
215                output.kv("Devices", self.devices);
216                output.kv("Points per Device", self.points_per_device);
217                output.kv("Total Points", self.devices * self.points_per_device);
218            }
219        }
220
221        // Verbose: show extra configuration details
222        if is_verbose {
223            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
224            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
225        }
226
227        // Debug: dump full configuration
228        if is_debug {
229            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
230            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
231            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
232        }
233
234        self.start_server(ctx).await?;
235
236        let points_per_type = self.points_per_device / 4;
237
238        if !is_quiet {
239            match format {
240                OutputFormat::Table => {
241                    let colors_enabled = ctx.colors_enabled();
242                    let builder = TableBuilder::new(colors_enabled)
243                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
244
245                    let devices = self.devices;
246                    let pts = points_per_type.to_string();
247                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
248                        let unit_id = (i + 1).to_string();
249                        (
250                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
251                            StatusType::Success,
252                        )
253                    });
254                    table.print();
255                }
256                _ => {
257                    #[derive(Serialize)]
258                    struct ModbusServerInfo {
259                        protocol: String,
260                        bind_address: String,
261                        devices: usize,
262                        points_per_device: usize,
263                        total_points: usize,
264                        rtu_mode: bool,
265                        serial_port: Option<String>,
266                        device_list: Vec<ModbusDeviceInfo>,
267                        status: String,
268                    }
269                    #[derive(Serialize)]
270                    struct ModbusDeviceInfo {
271                        unit_id: usize,
272                        holding_registers: usize,
273                        input_registers: usize,
274                        coils: usize,
275                        discrete_inputs: usize,
276                        status: String,
277                    }
278                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
279                        .map(|i| ModbusDeviceInfo {
280                            unit_id: i + 1,
281                            holding_registers: points_per_type,
282                            input_registers: points_per_type,
283                            coils: points_per_type,
284                            discrete_inputs: points_per_type,
285                            status: "Online".into(),
286                        })
287                        .collect();
288                    let info = ModbusServerInfo {
289                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
290                        bind_address: self.bind_addr.to_string(),
291                        devices: self.devices,
292                        points_per_device: self.points_per_device,
293                        total_points: self.devices * self.points_per_device,
294                        rtu_mode: self.rtu_mode,
295                        serial_port: self.serial_port.clone(),
296                        device_list,
297                        status: "Online".into(),
298                    };
299                    let _ = ctx.output().write(&info);
300                }
301            }
302        }
303
304        if !is_quiet {
305            ctx.output().info("Press Ctrl+C to stop");
306        }
307        ctx.shutdown_signal().notified().await;
308
309        self.stop_server(ctx).await?;
310        if !is_quiet {
311            ctx.output().success("Modbus simulator stopped");
312        }
313
314        Ok(CommandOutput::quiet_success())
315    }
316}
317
318#[async_trait]
319impl ProtocolCommand for ModbusCommand {
320    fn protocol(&self) -> Protocol {
321        if self.rtu_mode {
322            Protocol::ModbusRtu
323        } else {
324            Protocol::ModbusTcp
325        }
326    }
327
328    fn default_port(&self) -> u16 {
329        502
330    }
331
332    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
333        let output = ctx.output();
334
335        // Advisory port availability check before starting
336        check_port_availability(self.bind_addr).await;
337
338        let spinner = output.spinner("Starting Modbus server...");
339
340        let config = ServerConfigV2 {
341            bind_address: self.bind_addr,
342            ..Default::default()
343        };
344
345        let server = Arc::new(ModbusTcpServerV2::new(config));
346
347        for i in 0..self.devices {
348            let unit_id = (i + 1) as u8;
349            let points = (self.points_per_device / 4) as u16;
350            let device_config = ModbusDeviceConfig {
351                unit_id,
352                name: format!("Device-{}", unit_id),
353                holding_registers: points,
354                input_registers: points,
355                coils: points,
356                discrete_inputs: points,
357                response_delay_ms: 0,
358                tags: self.tags.clone(),
359            };
360            let device = ModbusDevice::new(device_config);
361            server.add_device(device);
362        }
363
364        {
365            let mut server_guard = self.server.lock().await;
366            *server_guard = Some(server.clone());
367        }
368
369        let server_clone = server.clone();
370        let task = tokio::spawn(async move {
371            if let Err(e) = server_clone.run().await {
372                tracing::error!("Modbus server error: {}", e);
373            }
374        });
375
376        {
377            let mut task_guard = self.server_task.lock().await;
378            *task_guard = Some(task);
379        }
380
381        tokio::time::sleep(Duration::from_millis(100)).await;
382
383        // Check if server task exited early (likely a bind failure / port-in-use)
384        {
385            let task_guard = self.server_task.lock().await;
386            if let Some(task) = task_guard.as_ref() {
387                if task.is_finished() {
388                    spinner.finish_with_message("Failed to start server");
389                    return Err(crate::error::CliError::PortInUse {
390                        port: self.bind_addr.port(),
391                    });
392                }
393            }
394        }
395
396        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
397        Ok(())
398    }
399
400    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
401        if let Some(server) = self.server.lock().await.as_ref() {
402            server.shutdown();
403        }
404
405        if let Some(task) = self.server_task.lock().await.take() {
406            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
407        }
408
409        Ok(())
410    }
411}
412
413// =============================================================================
414// OPC UA Command
415// =============================================================================
416
417/// OPC UA protocol command.
418pub struct OpcuaCommand {
419    bind_addr: SocketAddr,
420    endpoint_path: String,
421    nodes: usize,
422    security_mode: String,
423    /// Device tags.
424    tags: Tags,
425    /// Server instance (for shutdown).
426    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
427    /// Server task handle.
428    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
429}
430
431impl OpcuaCommand {
432    pub fn new() -> Self {
433        Self {
434            bind_addr: "0.0.0.0:4840".parse().unwrap(),
435            endpoint_path: "/".into(),
436            nodes: 1000,
437            security_mode: "None".into(),
438            tags: Tags::new(),
439            server: Arc::new(Mutex::new(None)),
440            server_task: Arc::new(Mutex::new(None)),
441        }
442    }
443
444    pub fn with_port(mut self, port: u16) -> Self {
445        self.bind_addr.set_port(port);
446        self
447    }
448
449    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
450        self.endpoint_path = path.into();
451        self
452    }
453
454    pub fn with_nodes(mut self, nodes: usize) -> Self {
455        self.nodes = nodes;
456        self
457    }
458
459    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
460        self.security_mode = mode.into();
461        self
462    }
463
464    pub fn with_tags(mut self, tags: Tags) -> Self {
465        self.tags = tags;
466        self
467    }
468}
469
470impl Default for OpcuaCommand {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476#[async_trait]
477impl Command for OpcuaCommand {
478    fn name(&self) -> &str {
479        "opcua"
480    }
481
482    fn description(&self) -> &str {
483        "Start an OPC UA server simulator"
484    }
485
486    fn requires_engine(&self) -> bool {
487        true
488    }
489
490    fn supports_shutdown(&self) -> bool {
491        true
492    }
493
494    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
495        let format = ctx.output().format();
496        let is_quiet = ctx.is_quiet();
497        let is_verbose = ctx.is_verbose();
498        let is_debug = ctx.is_debug();
499
500        if !is_quiet {
501            if matches!(format, OutputFormat::Table) {
502                let output = ctx.output();
503                output.header("OPC UA Simulator");
504                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
505                output.kv("Nodes", self.nodes);
506                output.kv("Security Mode", &self.security_mode);
507            }
508        }
509
510        // Verbose: show extra details
511        if is_verbose {
512            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
513            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
514            ctx.vprintln(format!("  Max Subscriptions: 1000"));
515            ctx.vprintln(format!("  Max Monitored Items: 10000"));
516        }
517
518        // Debug: dump full configuration
519        if is_debug {
520            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
521            ctx.dprintln(format!("Node count: {}", self.nodes));
522            ctx.dprintln(format!("Security mode: {}", self.security_mode));
523            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
524        }
525
526        self.start_server(ctx).await?;
527
528        if !is_quiet {
529            match format {
530                OutputFormat::Table => {
531                    let colors_enabled = ctx.colors_enabled();
532                    let table = TableBuilder::new(colors_enabled)
533                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
534                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
535                        .status_row(
536                            ["1", &self.nodes.to_string(), "0", "Online"],
537                            StatusType::Success,
538                        );
539                    table.print();
540                }
541                _ => {
542                    #[derive(Serialize)]
543                    struct OpcuaServerInfo {
544                        protocol: String,
545                        endpoint: String,
546                        nodes: usize,
547                        security_mode: String,
548                        namespaces: Vec<NamespaceInfo>,
549                        status: String,
550                    }
551                    #[derive(Serialize)]
552                    struct NamespaceInfo {
553                        index: u32,
554                        nodes: String,
555                        subscriptions: u32,
556                        status: String,
557                    }
558                    let info = OpcuaServerInfo {
559                        protocol: "OPC UA".into(),
560                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
561                        nodes: self.nodes,
562                        security_mode: self.security_mode.clone(),
563                        namespaces: vec![
564                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
565                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
566                        ],
567                        status: "Online".into(),
568                    };
569                    let _ = ctx.output().write(&info);
570                }
571            }
572        }
573
574        if !is_quiet {
575            ctx.output().info("Press Ctrl+C to stop");
576        }
577        ctx.shutdown_signal().notified().await;
578
579        self.stop_server(ctx).await?;
580        if !is_quiet {
581            ctx.output().success("OPC UA simulator stopped");
582        }
583
584        Ok(CommandOutput::quiet_success())
585    }
586}
587
588#[async_trait]
589impl ProtocolCommand for OpcuaCommand {
590    fn protocol(&self) -> Protocol {
591        Protocol::OpcUa
592    }
593
594    fn default_port(&self) -> u16 {
595        4840
596    }
597
598    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
599        let output = ctx.output();
600        let spinner = output.spinner("Starting OPC UA server...");
601
602        let config = OpcUaServerConfig {
603            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
604            server_name: "Mabinogion OPC UA Simulator".to_string(),
605            max_subscriptions: 1000,
606            max_monitored_items: 10000,
607            ..Default::default()
608        };
609
610        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
611            crate::error::CliError::ExecutionFailed {
612                message: format!("Failed to create OPC UA server: {}", e)
613            }
614        })?);
615
616        // Add sample nodes with diverse types and mixed read-only / writable access.
617        // Even-indexed nodes are writable, odd-indexed are read-only.
618        let node_count = self.nodes.min(100);
619        for i in 0..node_count {
620            let node_id = format!("ns=2;i={}", 1000 + i);
621            let name = format!("Variable_{}", i);
622            let value = (i as f64) * 0.1;
623
624            if i % 2 == 0 {
625                let _ = server.add_writable_variable(node_id, name, value);
626            } else {
627                let _ = server.add_variable(node_id, name, value);
628            }
629        }
630
631        {
632            let mut server_guard = self.server.lock().await;
633            *server_guard = Some(server.clone());
634        }
635
636        let server_clone = server.clone();
637        let task = tokio::spawn(async move {
638            if let Err(e) = server_clone.start().await {
639                tracing::error!("OPC UA server error: {}", e);
640            }
641        });
642
643        {
644            let mut task_guard = self.server_task.lock().await;
645            *task_guard = Some(task);
646        }
647
648        tokio::time::sleep(Duration::from_millis(100)).await;
649
650        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
651        Ok(())
652    }
653
654    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
655        if let Some(server) = self.server.lock().await.as_ref() {
656            let _ = server.stop().await;
657        }
658
659        if let Some(task) = self.server_task.lock().await.take() {
660            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
661        }
662
663        Ok(())
664    }
665}
666
667// =============================================================================
668// BACnet Command
669// =============================================================================
670
671/// BACnet protocol command.
672pub struct BacnetCommand {
673    bind_addr: SocketAddr,
674    device_instance: u32,
675    objects: usize,
676    bbmd_enabled: bool,
677    /// Device tags.
678    tags: Tags,
679    /// Server instance (for shutdown).
680    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
681    /// Server task handle.
682    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
683}
684
685impl BacnetCommand {
686    pub fn new() -> Self {
687        Self {
688            bind_addr: "0.0.0.0:47808".parse().unwrap(),
689            device_instance: 1234,
690            objects: 100,
691            bbmd_enabled: false,
692            tags: Tags::new(),
693            server: Arc::new(Mutex::new(None)),
694            server_task: Arc::new(Mutex::new(None)),
695        }
696    }
697
698    pub fn with_port(mut self, port: u16) -> Self {
699        self.bind_addr.set_port(port);
700        self
701    }
702
703    pub fn with_device_instance(mut self, instance: u32) -> Self {
704        self.device_instance = instance;
705        self
706    }
707
708    pub fn with_objects(mut self, objects: usize) -> Self {
709        self.objects = objects;
710        self
711    }
712
713    pub fn with_bbmd(mut self, enabled: bool) -> Self {
714        self.bbmd_enabled = enabled;
715        self
716    }
717
718    pub fn with_tags(mut self, tags: Tags) -> Self {
719        self.tags = tags;
720        self
721    }
722}
723
724impl Default for BacnetCommand {
725    fn default() -> Self {
726        Self::new()
727    }
728}
729
730#[async_trait]
731impl Command for BacnetCommand {
732    fn name(&self) -> &str {
733        "bacnet"
734    }
735
736    fn description(&self) -> &str {
737        "Start a BACnet/IP simulator"
738    }
739
740    fn requires_engine(&self) -> bool {
741        true
742    }
743
744    fn supports_shutdown(&self) -> bool {
745        true
746    }
747
748    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
749        let format = ctx.output().format();
750        let is_quiet = ctx.is_quiet();
751        let is_verbose = ctx.is_verbose();
752        let is_debug = ctx.is_debug();
753
754        if !is_quiet {
755            if matches!(format, OutputFormat::Table) {
756                let output = ctx.output();
757                output.header("BACnet/IP Simulator");
758                output.kv("Bind Address", self.bind_addr);
759                output.kv("Device Instance", self.device_instance);
760                output.kv("Objects", self.objects);
761                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
762            }
763        }
764
765        // Verbose: show extra details
766        if is_verbose {
767            let per_type = self.objects / 4;
768            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
769            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
770        }
771
772        // Debug: dump full configuration
773        if is_debug {
774            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
775            ctx.dprintln(format!("Device instance: {}", self.device_instance));
776            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
777        }
778
779        self.start_server(ctx).await?;
780
781        let per_type = self.objects / 4;
782
783        if !is_quiet {
784            match format {
785                OutputFormat::Table => {
786                    let colors_enabled = ctx.colors_enabled();
787                    let table = TableBuilder::new(colors_enabled)
788                        .header(["Object Type", "Count", "Status"])
789                        .status_row(["Device", "1", "Online"], StatusType::Success)
790                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
791                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
792                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
793                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
794                    table.print();
795                }
796                _ => {
797                    #[derive(Serialize)]
798                    struct BacnetServerInfo {
799                        protocol: String,
800                        bind_address: String,
801                        device_instance: u32,
802                        objects: usize,
803                        bbmd_enabled: bool,
804                        object_types: Vec<ObjectTypeInfo>,
805                        status: String,
806                    }
807                    #[derive(Serialize)]
808                    struct ObjectTypeInfo {
809                        object_type: String,
810                        count: usize,
811                        status: String,
812                    }
813                    let info = BacnetServerInfo {
814                        protocol: "BACnet/IP".into(),
815                        bind_address: self.bind_addr.to_string(),
816                        device_instance: self.device_instance,
817                        objects: self.objects,
818                        bbmd_enabled: self.bbmd_enabled,
819                        object_types: vec![
820                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
821                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
822                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
823                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
824                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
825                        ],
826                        status: "Online".into(),
827                    };
828                    let _ = ctx.output().write(&info);
829                }
830            }
831        }
832
833        if !is_quiet {
834            ctx.output().info("Press Ctrl+C to stop");
835        }
836        ctx.shutdown_signal().notified().await;
837
838        self.stop_server(ctx).await?;
839        if !is_quiet {
840            ctx.output().success("BACnet simulator stopped");
841        }
842
843        Ok(CommandOutput::quiet_success())
844    }
845}
846
847#[async_trait]
848impl ProtocolCommand for BacnetCommand {
849    fn protocol(&self) -> Protocol {
850        Protocol::BacnetIp
851    }
852
853    fn default_port(&self) -> u16 {
854        47808
855    }
856
857    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
858        let output = ctx.output();
859        let spinner = output.spinner("Starting BACnet server...");
860
861        let config = BacnetServerConfig::new(self.device_instance)
862            .with_bind_addr(self.bind_addr)
863            .with_device_name("Mabinogion BACnet Simulator");
864
865        // Create object registry with sample objects
866        let registry = ObjectRegistry::new();
867
868        let descriptors = default_object_descriptors();
869        let objects_per_type = self.objects / descriptors.len();
870        registry.populate_standard_objects(&descriptors, objects_per_type);
871
872        let server = Arc::new(BACnetServer::new(config, registry));
873
874        {
875            let mut server_guard = self.server.lock().await;
876            *server_guard = Some(server.clone());
877        }
878
879        let server_clone = server.clone();
880        let task = tokio::spawn(async move {
881            if let Err(e) = server_clone.run().await {
882                tracing::error!("BACnet server error: {}", e);
883            }
884        });
885
886        {
887            let mut task_guard = self.server_task.lock().await;
888            *task_guard = Some(task);
889        }
890
891        tokio::time::sleep(Duration::from_millis(100)).await;
892
893        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
894        Ok(())
895    }
896
897    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
898        if let Some(server) = self.server.lock().await.as_ref() {
899            server.shutdown();
900        }
901
902        if let Some(task) = self.server_task.lock().await.take() {
903            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
904        }
905
906        Ok(())
907    }
908}
909
910// =============================================================================
911// KNX Command
912// =============================================================================
913
914/// KNX protocol command.
915pub struct KnxCommand {
916    bind_addr: SocketAddr,
917    individual_address: String,
918    group_objects: usize,
919    /// Device tags.
920    tags: Tags,
921    /// Server instance (for shutdown).
922    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
923    /// Server task handle.
924    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
925}
926
927impl KnxCommand {
928    pub fn new() -> Self {
929        Self {
930            bind_addr: "0.0.0.0:3671".parse().unwrap(),
931            individual_address: "1.1.1".into(),
932            group_objects: 100,
933            tags: Tags::new(),
934            server: Arc::new(Mutex::new(None)),
935            server_task: Arc::new(Mutex::new(None)),
936        }
937    }
938
939    pub fn with_port(mut self, port: u16) -> Self {
940        self.bind_addr.set_port(port);
941        self
942    }
943
944    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
945        self.individual_address = addr.into();
946        self
947    }
948
949    pub fn with_group_objects(mut self, count: usize) -> Self {
950        self.group_objects = count;
951        self
952    }
953
954    pub fn with_tags(mut self, tags: Tags) -> Self {
955        self.tags = tags;
956        self
957    }
958}
959
960impl Default for KnxCommand {
961    fn default() -> Self {
962        Self::new()
963    }
964}
965
966#[async_trait]
967impl Command for KnxCommand {
968    fn name(&self) -> &str {
969        "knx"
970    }
971
972    fn description(&self) -> &str {
973        "Start a KNXnet/IP simulator"
974    }
975
976    fn requires_engine(&self) -> bool {
977        true
978    }
979
980    fn supports_shutdown(&self) -> bool {
981        true
982    }
983
984    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
985        let format = ctx.output().format();
986        let is_quiet = ctx.is_quiet();
987        let is_verbose = ctx.is_verbose();
988        let is_debug = ctx.is_debug();
989
990        if !is_quiet {
991            if matches!(format, OutputFormat::Table) {
992                let output = ctx.output();
993                output.header("KNXnet/IP Simulator");
994                output.kv("Bind Address", self.bind_addr);
995                output.kv("Individual Address", &self.individual_address);
996                output.kv("Group Objects", self.group_objects);
997            }
998        }
999
1000        // Verbose: show extra details
1001        if is_verbose {
1002            ctx.vprintln(format!("  Max Connections: 10"));
1003            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
1004        }
1005
1006        // Debug: dump full configuration
1007        if is_debug {
1008            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1009            ctx.dprintln(format!("Individual address: {}", self.individual_address));
1010            ctx.dprintln(format!("Group objects: {}", self.group_objects));
1011        }
1012
1013        self.start_server(ctx).await?;
1014
1015        if !is_quiet {
1016            match format {
1017                OutputFormat::Table => {
1018                    let colors_enabled = ctx.colors_enabled();
1019                    let table = TableBuilder::new(colors_enabled)
1020                        .header(["Service", "Status"])
1021                        .status_row(["Core", "Ready"], StatusType::Success)
1022                        .status_row(["Device Management", "Ready"], StatusType::Success)
1023                        .status_row(["Tunneling", "Ready"], StatusType::Success);
1024                    table.print();
1025                }
1026                _ => {
1027                    #[derive(Serialize)]
1028                    struct KnxServerInfo {
1029                        protocol: String,
1030                        bind_address: String,
1031                        individual_address: String,
1032                        group_objects: usize,
1033                        services: Vec<ServiceInfo>,
1034                        status: String,
1035                    }
1036                    #[derive(Serialize)]
1037                    struct ServiceInfo {
1038                        service: String,
1039                        status: String,
1040                    }
1041                    let info = KnxServerInfo {
1042                        protocol: "KNXnet/IP".into(),
1043                        bind_address: self.bind_addr.to_string(),
1044                        individual_address: self.individual_address.clone(),
1045                        group_objects: self.group_objects,
1046                        services: vec![
1047                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
1048                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
1049                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
1050                        ],
1051                        status: "Online".into(),
1052                    };
1053                    let _ = ctx.output().write(&info);
1054                }
1055            }
1056        }
1057
1058        if !is_quiet {
1059            ctx.output().info("Press Ctrl+C to stop");
1060        }
1061        ctx.shutdown_signal().notified().await;
1062
1063        self.stop_server(ctx).await?;
1064        if !is_quiet {
1065            ctx.output().success("KNX simulator stopped");
1066        }
1067
1068        Ok(CommandOutput::quiet_success())
1069    }
1070}
1071
1072#[async_trait]
1073impl ProtocolCommand for KnxCommand {
1074    fn protocol(&self) -> Protocol {
1075        Protocol::KnxIp
1076    }
1077
1078    fn default_port(&self) -> u16 {
1079        3671
1080    }
1081
1082    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1083        let output = ctx.output();
1084        let spinner = output.spinner("Starting KNX server...");
1085
1086        // Parse individual address
1087        let individual_address: IndividualAddress = self.individual_address.parse()
1088            .map_err(|_| crate::error::CliError::ExecutionFailed {
1089                message: format!("Invalid individual address: {}", self.individual_address)
1090            })?;
1091
1092        let config = KnxServerConfig {
1093            bind_addr: self.bind_addr,
1094            individual_address,
1095            max_connections: 256,
1096            ..Default::default()
1097        };
1098
1099        // Create group objects based on --groups parameter
1100        let group_table = Arc::new(GroupObjectTable::new());
1101        let dpt_types = [
1102            DptId::new(1, 1),   // Switch (bool)
1103            DptId::new(5, 1),   // Scaling (0-100%)
1104            DptId::new(9, 1),   // Temperature (float16)
1105            DptId::new(9, 4),   // Lux
1106            DptId::new(9, 7),   // Humidity
1107            DptId::new(12, 1),  // Counter (u32)
1108            DptId::new(13, 1),  // Counter signed (i32)
1109            DptId::new(14, 56), // Float (f32)
1110        ];
1111        let dpt_names = [
1112            "Switch", "Scaling", "Temperature", "Lux",
1113            "Humidity", "Counter", "SignedCounter", "Float",
1114        ];
1115
1116        for i in 0..self.group_objects {
1117            let main = ((i / 256) + 1) as u8;
1118            let middle = ((i / 8) % 8) as u8;
1119            let sub = (i % 256) as u8;
1120            let addr = GroupAddress::three_level(main, middle, sub);
1121            let dpt_idx = i % dpt_types.len();
1122            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1123            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1124                tracing::warn!("Failed to create group object {}: {}", i, e);
1125            }
1126        }
1127
1128        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1129
1130        {
1131            let mut server_guard = self.server.lock().await;
1132            *server_guard = Some(server.clone());
1133        }
1134
1135        let server_clone = server.clone();
1136        let task = tokio::spawn(async move {
1137            if let Err(e) = server_clone.start().await {
1138                tracing::error!("KNX server error: {}", e);
1139            }
1140        });
1141
1142        {
1143            let mut task_guard = self.server_task.lock().await;
1144            *task_guard = Some(task);
1145        }
1146
1147        tokio::time::sleep(Duration::from_millis(100)).await;
1148
1149        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1150        Ok(())
1151    }
1152
1153    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1154        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1155        let server_opt = self.server.lock().await.take();
1156        if let Some(server) = server_opt {
1157            // Use spawn_blocking to handle the non-Send future
1158            let _ = tokio::task::spawn_blocking(move || {
1159                let rt = tokio::runtime::Handle::current();
1160                rt.block_on(async {
1161                    let _ = server.stop().await;
1162                })
1163            }).await;
1164        }
1165
1166        if let Some(task) = self.server_task.lock().await.take() {
1167            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1168        }
1169
1170        Ok(())
1171    }
1172}