Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, default_object_descriptors};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25/// Base trait for protocol-specific commands.
26#[async_trait]
27pub trait ProtocolCommand: Command {
28    /// Get the protocol type.
29    fn protocol(&self) -> Protocol;
30
31    /// Get the default port.
32    fn default_port(&self) -> u16;
33
34    /// Start the protocol server.
35    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37    /// Stop the protocol server.
38    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41// =============================================================================
42// Modbus Command
43// =============================================================================
44
45/// Modbus protocol command.
46pub struct ModbusCommand {
47    /// Binding address.
48    bind_addr: SocketAddr,
49    /// Number of devices to simulate.
50    devices: usize,
51    /// Points per device.
52    points_per_device: usize,
53    /// Use RTU mode instead of TCP.
54    rtu_mode: bool,
55    /// Serial port for RTU mode.
56    serial_port: Option<String>,
57    /// Device tags.
58    tags: Tags,
59    /// Server instance (for shutdown).
60    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61    /// Server task handle.
62    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66    pub fn new() -> Self {
67        Self {
68            bind_addr: "0.0.0.0:502".parse().unwrap(),
69            devices: 1,
70            points_per_device: 100,
71            rtu_mode: false,
72            serial_port: None,
73            tags: Tags::new(),
74            server: Arc::new(Mutex::new(None)),
75            server_task: Arc::new(Mutex::new(None)),
76        }
77    }
78
79    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80        self.bind_addr = addr;
81        self
82    }
83
84    pub fn with_port(mut self, port: u16) -> Self {
85        self.bind_addr.set_port(port);
86        self
87    }
88
89    pub fn with_devices(mut self, devices: usize) -> Self {
90        self.devices = devices;
91        self
92    }
93
94    pub fn with_points(mut self, points: usize) -> Self {
95        self.points_per_device = points;
96        self
97    }
98
99    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100        self.rtu_mode = true;
101        self.serial_port = Some(serial_port.into());
102        self
103    }
104
105    pub fn with_tags(mut self, tags: Tags) -> Self {
106        self.tags = tags;
107        self
108    }
109}
110
111impl Default for ModbusCommand {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119    fn name(&self) -> &str {
120        "modbus"
121    }
122
123    fn description(&self) -> &str {
124        "Start a Modbus TCP/RTU simulator"
125    }
126
127    fn requires_engine(&self) -> bool {
128        true
129    }
130
131    fn supports_shutdown(&self) -> bool {
132        true
133    }
134
135    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136        let format = ctx.output().format();
137        let is_quiet = ctx.is_quiet();
138        let is_verbose = ctx.is_verbose();
139        let is_debug = ctx.is_debug();
140
141        if !is_quiet {
142            if matches!(format, OutputFormat::Table) {
143                let output = ctx.output();
144                if self.rtu_mode {
145                    output.header("Modbus RTU Simulator");
146                    output.kv(
147                        "Serial Port",
148                        self.serial_port.as_deref().unwrap_or("N/A"),
149                    );
150                } else {
151                    output.header("Modbus TCP Simulator");
152                    output.kv("Bind Address", self.bind_addr);
153                }
154                output.kv("Devices", self.devices);
155                output.kv("Points per Device", self.points_per_device);
156                output.kv("Total Points", self.devices * self.points_per_device);
157            }
158        }
159
160        // Verbose: show extra configuration details
161        if is_verbose {
162            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
164        }
165
166        // Debug: dump full configuration
167        if is_debug {
168            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171        }
172
173        self.start_server(ctx).await?;
174
175        let points_per_type = self.points_per_device / 4;
176
177        if !is_quiet {
178            match format {
179                OutputFormat::Table => {
180                    let colors_enabled = ctx.colors_enabled();
181                    let builder = TableBuilder::new(colors_enabled)
182                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184                    let devices = self.devices;
185                    let pts = points_per_type.to_string();
186                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187                        let unit_id = (i + 1).to_string();
188                        (
189                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190                            StatusType::Success,
191                        )
192                    });
193                    table.print();
194                }
195                _ => {
196                    #[derive(Serialize)]
197                    struct ModbusServerInfo {
198                        protocol: String,
199                        bind_address: String,
200                        devices: usize,
201                        points_per_device: usize,
202                        total_points: usize,
203                        rtu_mode: bool,
204                        serial_port: Option<String>,
205                        device_list: Vec<ModbusDeviceInfo>,
206                        status: String,
207                    }
208                    #[derive(Serialize)]
209                    struct ModbusDeviceInfo {
210                        unit_id: usize,
211                        holding_registers: usize,
212                        input_registers: usize,
213                        coils: usize,
214                        discrete_inputs: usize,
215                        status: String,
216                    }
217                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218                        .map(|i| ModbusDeviceInfo {
219                            unit_id: i + 1,
220                            holding_registers: points_per_type,
221                            input_registers: points_per_type,
222                            coils: points_per_type,
223                            discrete_inputs: points_per_type,
224                            status: "Online".into(),
225                        })
226                        .collect();
227                    let info = ModbusServerInfo {
228                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229                        bind_address: self.bind_addr.to_string(),
230                        devices: self.devices,
231                        points_per_device: self.points_per_device,
232                        total_points: self.devices * self.points_per_device,
233                        rtu_mode: self.rtu_mode,
234                        serial_port: self.serial_port.clone(),
235                        device_list,
236                        status: "Online".into(),
237                    };
238                    let _ = ctx.output().write(&info);
239                }
240            }
241        }
242
243        if !is_quiet {
244            ctx.output().info("Press Ctrl+C to stop");
245        }
246        ctx.shutdown_signal().notified().await;
247
248        self.stop_server(ctx).await?;
249        if !is_quiet {
250            ctx.output().success("Modbus simulator stopped");
251        }
252
253        Ok(CommandOutput::quiet_success())
254    }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259    fn protocol(&self) -> Protocol {
260        if self.rtu_mode {
261            Protocol::ModbusRtu
262        } else {
263            Protocol::ModbusTcp
264        }
265    }
266
267    fn default_port(&self) -> u16 {
268        502
269    }
270
271    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272        let output = ctx.output();
273        let spinner = output.spinner("Starting Modbus server...");
274
275        let config = ServerConfigV2 {
276            bind_address: self.bind_addr,
277            ..Default::default()
278        };
279
280        let server = Arc::new(ModbusTcpServerV2::new(config));
281
282        for i in 0..self.devices {
283            let unit_id = (i + 1) as u8;
284            let points = (self.points_per_device / 4) as u16;
285            let device_config = ModbusDeviceConfig {
286                unit_id,
287                name: format!("Device-{}", unit_id),
288                holding_registers: points,
289                input_registers: points,
290                coils: points,
291                discrete_inputs: points,
292                response_delay_ms: 0,
293                tags: self.tags.clone(),
294            };
295            let device = ModbusDevice::new(device_config);
296            server.add_device(device);
297        }
298
299        {
300            let mut server_guard = self.server.lock().await;
301            *server_guard = Some(server.clone());
302        }
303
304        let server_clone = server.clone();
305        let task = tokio::spawn(async move {
306            if let Err(e) = server_clone.run().await {
307                tracing::error!("Modbus server error: {}", e);
308            }
309        });
310
311        {
312            let mut task_guard = self.server_task.lock().await;
313            *task_guard = Some(task);
314        }
315
316        tokio::time::sleep(Duration::from_millis(100)).await;
317
318        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319        Ok(())
320    }
321
322    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323        if let Some(server) = self.server.lock().await.as_ref() {
324            server.shutdown();
325        }
326
327        if let Some(task) = self.server_task.lock().await.take() {
328            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329        }
330
331        Ok(())
332    }
333}
334
335// =============================================================================
336// OPC UA Command
337// =============================================================================
338
339/// OPC UA protocol command.
340pub struct OpcuaCommand {
341    bind_addr: SocketAddr,
342    endpoint_path: String,
343    nodes: usize,
344    security_mode: String,
345    /// Device tags.
346    tags: Tags,
347    /// Server instance (for shutdown).
348    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
349    /// Server task handle.
350    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
351}
352
353impl OpcuaCommand {
354    pub fn new() -> Self {
355        Self {
356            bind_addr: "0.0.0.0:4840".parse().unwrap(),
357            endpoint_path: "/".into(),
358            nodes: 1000,
359            security_mode: "None".into(),
360            tags: Tags::new(),
361            server: Arc::new(Mutex::new(None)),
362            server_task: Arc::new(Mutex::new(None)),
363        }
364    }
365
366    pub fn with_port(mut self, port: u16) -> Self {
367        self.bind_addr.set_port(port);
368        self
369    }
370
371    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
372        self.endpoint_path = path.into();
373        self
374    }
375
376    pub fn with_nodes(mut self, nodes: usize) -> Self {
377        self.nodes = nodes;
378        self
379    }
380
381    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
382        self.security_mode = mode.into();
383        self
384    }
385
386    pub fn with_tags(mut self, tags: Tags) -> Self {
387        self.tags = tags;
388        self
389    }
390}
391
392impl Default for OpcuaCommand {
393    fn default() -> Self {
394        Self::new()
395    }
396}
397
398#[async_trait]
399impl Command for OpcuaCommand {
400    fn name(&self) -> &str {
401        "opcua"
402    }
403
404    fn description(&self) -> &str {
405        "Start an OPC UA server simulator"
406    }
407
408    fn requires_engine(&self) -> bool {
409        true
410    }
411
412    fn supports_shutdown(&self) -> bool {
413        true
414    }
415
416    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
417        let format = ctx.output().format();
418        let is_quiet = ctx.is_quiet();
419        let is_verbose = ctx.is_verbose();
420        let is_debug = ctx.is_debug();
421
422        if !is_quiet {
423            if matches!(format, OutputFormat::Table) {
424                let output = ctx.output();
425                output.header("OPC UA Simulator");
426                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
427                output.kv("Nodes", self.nodes);
428                output.kv("Security Mode", &self.security_mode);
429            }
430        }
431
432        // Verbose: show extra details
433        if is_verbose {
434            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
435            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
436            ctx.vprintln(format!("  Max Subscriptions: 1000"));
437            ctx.vprintln(format!("  Max Monitored Items: 10000"));
438        }
439
440        // Debug: dump full configuration
441        if is_debug {
442            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
443            ctx.dprintln(format!("Node count: {}", self.nodes));
444            ctx.dprintln(format!("Security mode: {}", self.security_mode));
445            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
446        }
447
448        self.start_server(ctx).await?;
449
450        if !is_quiet {
451            match format {
452                OutputFormat::Table => {
453                    let colors_enabled = ctx.colors_enabled();
454                    let table = TableBuilder::new(colors_enabled)
455                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
456                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
457                        .status_row(
458                            ["1", &self.nodes.to_string(), "0", "Online"],
459                            StatusType::Success,
460                        );
461                    table.print();
462                }
463                _ => {
464                    #[derive(Serialize)]
465                    struct OpcuaServerInfo {
466                        protocol: String,
467                        endpoint: String,
468                        nodes: usize,
469                        security_mode: String,
470                        namespaces: Vec<NamespaceInfo>,
471                        status: String,
472                    }
473                    #[derive(Serialize)]
474                    struct NamespaceInfo {
475                        index: u32,
476                        nodes: String,
477                        subscriptions: u32,
478                        status: String,
479                    }
480                    let info = OpcuaServerInfo {
481                        protocol: "OPC UA".into(),
482                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
483                        nodes: self.nodes,
484                        security_mode: self.security_mode.clone(),
485                        namespaces: vec![
486                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
487                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
488                        ],
489                        status: "Online".into(),
490                    };
491                    let _ = ctx.output().write(&info);
492                }
493            }
494        }
495
496        if !is_quiet {
497            ctx.output().info("Press Ctrl+C to stop");
498        }
499        ctx.shutdown_signal().notified().await;
500
501        self.stop_server(ctx).await?;
502        if !is_quiet {
503            ctx.output().success("OPC UA simulator stopped");
504        }
505
506        Ok(CommandOutput::quiet_success())
507    }
508}
509
510#[async_trait]
511impl ProtocolCommand for OpcuaCommand {
512    fn protocol(&self) -> Protocol {
513        Protocol::OpcUa
514    }
515
516    fn default_port(&self) -> u16 {
517        4840
518    }
519
520    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
521        let output = ctx.output();
522        let spinner = output.spinner("Starting OPC UA server...");
523
524        let config = OpcUaServerConfig {
525            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
526            server_name: "Mabinogion OPC UA Simulator".to_string(),
527            max_subscriptions: 1000,
528            max_monitored_items: 10000,
529            ..Default::default()
530        };
531
532        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
533            crate::error::CliError::ExecutionFailed {
534                message: format!("Failed to create OPC UA server: {}", e)
535            }
536        })?);
537
538        // Add sample nodes with diverse types and mixed read-only / writable access.
539        // Even-indexed nodes are writable, odd-indexed are read-only.
540        let node_count = self.nodes.min(100);
541        for i in 0..node_count {
542            let node_id = format!("ns=2;i={}", 1000 + i);
543            let name = format!("Variable_{}", i);
544            let value = (i as f64) * 0.1;
545
546            if i % 2 == 0 {
547                let _ = server.add_writable_variable(node_id, name, value);
548            } else {
549                let _ = server.add_variable(node_id, name, value);
550            }
551        }
552
553        {
554            let mut server_guard = self.server.lock().await;
555            *server_guard = Some(server.clone());
556        }
557
558        let server_clone = server.clone();
559        let task = tokio::spawn(async move {
560            if let Err(e) = server_clone.start().await {
561                tracing::error!("OPC UA server error: {}", e);
562            }
563        });
564
565        {
566            let mut task_guard = self.server_task.lock().await;
567            *task_guard = Some(task);
568        }
569
570        tokio::time::sleep(Duration::from_millis(100)).await;
571
572        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
573        Ok(())
574    }
575
576    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
577        if let Some(server) = self.server.lock().await.as_ref() {
578            let _ = server.stop().await;
579        }
580
581        if let Some(task) = self.server_task.lock().await.take() {
582            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
583        }
584
585        Ok(())
586    }
587}
588
589// =============================================================================
590// BACnet Command
591// =============================================================================
592
593/// BACnet protocol command.
594pub struct BacnetCommand {
595    bind_addr: SocketAddr,
596    device_instance: u32,
597    objects: usize,
598    bbmd_enabled: bool,
599    /// Device tags.
600    tags: Tags,
601    /// Server instance (for shutdown).
602    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
603    /// Server task handle.
604    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
605}
606
607impl BacnetCommand {
608    pub fn new() -> Self {
609        Self {
610            bind_addr: "0.0.0.0:47808".parse().unwrap(),
611            device_instance: 1234,
612            objects: 100,
613            bbmd_enabled: false,
614            tags: Tags::new(),
615            server: Arc::new(Mutex::new(None)),
616            server_task: Arc::new(Mutex::new(None)),
617        }
618    }
619
620    pub fn with_port(mut self, port: u16) -> Self {
621        self.bind_addr.set_port(port);
622        self
623    }
624
625    pub fn with_device_instance(mut self, instance: u32) -> Self {
626        self.device_instance = instance;
627        self
628    }
629
630    pub fn with_objects(mut self, objects: usize) -> Self {
631        self.objects = objects;
632        self
633    }
634
635    pub fn with_bbmd(mut self, enabled: bool) -> Self {
636        self.bbmd_enabled = enabled;
637        self
638    }
639
640    pub fn with_tags(mut self, tags: Tags) -> Self {
641        self.tags = tags;
642        self
643    }
644}
645
646impl Default for BacnetCommand {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652#[async_trait]
653impl Command for BacnetCommand {
654    fn name(&self) -> &str {
655        "bacnet"
656    }
657
658    fn description(&self) -> &str {
659        "Start a BACnet/IP simulator"
660    }
661
662    fn requires_engine(&self) -> bool {
663        true
664    }
665
666    fn supports_shutdown(&self) -> bool {
667        true
668    }
669
670    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671        let format = ctx.output().format();
672        let is_quiet = ctx.is_quiet();
673        let is_verbose = ctx.is_verbose();
674        let is_debug = ctx.is_debug();
675
676        if !is_quiet {
677            if matches!(format, OutputFormat::Table) {
678                let output = ctx.output();
679                output.header("BACnet/IP Simulator");
680                output.kv("Bind Address", self.bind_addr);
681                output.kv("Device Instance", self.device_instance);
682                output.kv("Objects", self.objects);
683                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
684            }
685        }
686
687        // Verbose: show extra details
688        if is_verbose {
689            let per_type = self.objects / 4;
690            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
691            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
692        }
693
694        // Debug: dump full configuration
695        if is_debug {
696            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
697            ctx.dprintln(format!("Device instance: {}", self.device_instance));
698            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
699        }
700
701        self.start_server(ctx).await?;
702
703        let per_type = self.objects / 4;
704
705        if !is_quiet {
706            match format {
707                OutputFormat::Table => {
708                    let colors_enabled = ctx.colors_enabled();
709                    let table = TableBuilder::new(colors_enabled)
710                        .header(["Object Type", "Count", "Status"])
711                        .status_row(["Device", "1", "Online"], StatusType::Success)
712                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
713                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
714                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
715                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
716                    table.print();
717                }
718                _ => {
719                    #[derive(Serialize)]
720                    struct BacnetServerInfo {
721                        protocol: String,
722                        bind_address: String,
723                        device_instance: u32,
724                        objects: usize,
725                        bbmd_enabled: bool,
726                        object_types: Vec<ObjectTypeInfo>,
727                        status: String,
728                    }
729                    #[derive(Serialize)]
730                    struct ObjectTypeInfo {
731                        object_type: String,
732                        count: usize,
733                        status: String,
734                    }
735                    let info = BacnetServerInfo {
736                        protocol: "BACnet/IP".into(),
737                        bind_address: self.bind_addr.to_string(),
738                        device_instance: self.device_instance,
739                        objects: self.objects,
740                        bbmd_enabled: self.bbmd_enabled,
741                        object_types: vec![
742                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
743                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
744                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
745                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
746                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
747                        ],
748                        status: "Online".into(),
749                    };
750                    let _ = ctx.output().write(&info);
751                }
752            }
753        }
754
755        if !is_quiet {
756            ctx.output().info("Press Ctrl+C to stop");
757        }
758        ctx.shutdown_signal().notified().await;
759
760        self.stop_server(ctx).await?;
761        if !is_quiet {
762            ctx.output().success("BACnet simulator stopped");
763        }
764
765        Ok(CommandOutput::quiet_success())
766    }
767}
768
769#[async_trait]
770impl ProtocolCommand for BacnetCommand {
771    fn protocol(&self) -> Protocol {
772        Protocol::BacnetIp
773    }
774
775    fn default_port(&self) -> u16 {
776        47808
777    }
778
779    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
780        let output = ctx.output();
781        let spinner = output.spinner("Starting BACnet server...");
782
783        let config = BacnetServerConfig::new(self.device_instance)
784            .with_bind_addr(self.bind_addr)
785            .with_device_name("Mabinogion BACnet Simulator");
786
787        // Create object registry with sample objects
788        let registry = ObjectRegistry::new();
789
790        let descriptors = default_object_descriptors();
791        let objects_per_type = self.objects / descriptors.len();
792        registry.populate_standard_objects(&descriptors, objects_per_type);
793
794        let server = Arc::new(BACnetServer::new(config, registry));
795
796        {
797            let mut server_guard = self.server.lock().await;
798            *server_guard = Some(server.clone());
799        }
800
801        let server_clone = server.clone();
802        let task = tokio::spawn(async move {
803            if let Err(e) = server_clone.run().await {
804                tracing::error!("BACnet server error: {}", e);
805            }
806        });
807
808        {
809            let mut task_guard = self.server_task.lock().await;
810            *task_guard = Some(task);
811        }
812
813        tokio::time::sleep(Duration::from_millis(100)).await;
814
815        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
816        Ok(())
817    }
818
819    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
820        if let Some(server) = self.server.lock().await.as_ref() {
821            server.shutdown();
822        }
823
824        if let Some(task) = self.server_task.lock().await.take() {
825            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
826        }
827
828        Ok(())
829    }
830}
831
832// =============================================================================
833// KNX Command
834// =============================================================================
835
836/// KNX protocol command.
837pub struct KnxCommand {
838    bind_addr: SocketAddr,
839    individual_address: String,
840    group_objects: usize,
841    /// Device tags.
842    tags: Tags,
843    /// Server instance (for shutdown).
844    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
845    /// Server task handle.
846    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
847}
848
849impl KnxCommand {
850    pub fn new() -> Self {
851        Self {
852            bind_addr: "0.0.0.0:3671".parse().unwrap(),
853            individual_address: "1.1.1".into(),
854            group_objects: 100,
855            tags: Tags::new(),
856            server: Arc::new(Mutex::new(None)),
857            server_task: Arc::new(Mutex::new(None)),
858        }
859    }
860
861    pub fn with_port(mut self, port: u16) -> Self {
862        self.bind_addr.set_port(port);
863        self
864    }
865
866    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
867        self.individual_address = addr.into();
868        self
869    }
870
871    pub fn with_group_objects(mut self, count: usize) -> Self {
872        self.group_objects = count;
873        self
874    }
875
876    pub fn with_tags(mut self, tags: Tags) -> Self {
877        self.tags = tags;
878        self
879    }
880}
881
882impl Default for KnxCommand {
883    fn default() -> Self {
884        Self::new()
885    }
886}
887
888#[async_trait]
889impl Command for KnxCommand {
890    fn name(&self) -> &str {
891        "knx"
892    }
893
894    fn description(&self) -> &str {
895        "Start a KNXnet/IP simulator"
896    }
897
898    fn requires_engine(&self) -> bool {
899        true
900    }
901
902    fn supports_shutdown(&self) -> bool {
903        true
904    }
905
906    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
907        let format = ctx.output().format();
908        let is_quiet = ctx.is_quiet();
909        let is_verbose = ctx.is_verbose();
910        let is_debug = ctx.is_debug();
911
912        if !is_quiet {
913            if matches!(format, OutputFormat::Table) {
914                let output = ctx.output();
915                output.header("KNXnet/IP Simulator");
916                output.kv("Bind Address", self.bind_addr);
917                output.kv("Individual Address", &self.individual_address);
918                output.kv("Group Objects", self.group_objects);
919            }
920        }
921
922        // Verbose: show extra details
923        if is_verbose {
924            ctx.vprintln(format!("  Max Connections: 10"));
925            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
926        }
927
928        // Debug: dump full configuration
929        if is_debug {
930            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
931            ctx.dprintln(format!("Individual address: {}", self.individual_address));
932            ctx.dprintln(format!("Group objects: {}", self.group_objects));
933        }
934
935        self.start_server(ctx).await?;
936
937        if !is_quiet {
938            match format {
939                OutputFormat::Table => {
940                    let colors_enabled = ctx.colors_enabled();
941                    let table = TableBuilder::new(colors_enabled)
942                        .header(["Service", "Status"])
943                        .status_row(["Core", "Ready"], StatusType::Success)
944                        .status_row(["Device Management", "Ready"], StatusType::Success)
945                        .status_row(["Tunneling", "Ready"], StatusType::Success);
946                    table.print();
947                }
948                _ => {
949                    #[derive(Serialize)]
950                    struct KnxServerInfo {
951                        protocol: String,
952                        bind_address: String,
953                        individual_address: String,
954                        group_objects: usize,
955                        services: Vec<ServiceInfo>,
956                        status: String,
957                    }
958                    #[derive(Serialize)]
959                    struct ServiceInfo {
960                        service: String,
961                        status: String,
962                    }
963                    let info = KnxServerInfo {
964                        protocol: "KNXnet/IP".into(),
965                        bind_address: self.bind_addr.to_string(),
966                        individual_address: self.individual_address.clone(),
967                        group_objects: self.group_objects,
968                        services: vec![
969                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
970                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
971                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
972                        ],
973                        status: "Online".into(),
974                    };
975                    let _ = ctx.output().write(&info);
976                }
977            }
978        }
979
980        if !is_quiet {
981            ctx.output().info("Press Ctrl+C to stop");
982        }
983        ctx.shutdown_signal().notified().await;
984
985        self.stop_server(ctx).await?;
986        if !is_quiet {
987            ctx.output().success("KNX simulator stopped");
988        }
989
990        Ok(CommandOutput::quiet_success())
991    }
992}
993
994#[async_trait]
995impl ProtocolCommand for KnxCommand {
996    fn protocol(&self) -> Protocol {
997        Protocol::KnxIp
998    }
999
1000    fn default_port(&self) -> u16 {
1001        3671
1002    }
1003
1004    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1005        let output = ctx.output();
1006        let spinner = output.spinner("Starting KNX server...");
1007
1008        // Parse individual address
1009        let individual_address: IndividualAddress = self.individual_address.parse()
1010            .map_err(|_| crate::error::CliError::ExecutionFailed {
1011                message: format!("Invalid individual address: {}", self.individual_address)
1012            })?;
1013
1014        let config = KnxServerConfig {
1015            bind_addr: self.bind_addr,
1016            individual_address,
1017            max_connections: 256,
1018            ..Default::default()
1019        };
1020
1021        // Create group objects based on --groups parameter
1022        let group_table = Arc::new(GroupObjectTable::new());
1023        let dpt_types = [
1024            DptId::new(1, 1),   // Switch (bool)
1025            DptId::new(5, 1),   // Scaling (0-100%)
1026            DptId::new(9, 1),   // Temperature (float16)
1027            DptId::new(9, 4),   // Lux
1028            DptId::new(9, 7),   // Humidity
1029            DptId::new(12, 1),  // Counter (u32)
1030            DptId::new(13, 1),  // Counter signed (i32)
1031            DptId::new(14, 56), // Float (f32)
1032        ];
1033        let dpt_names = [
1034            "Switch", "Scaling", "Temperature", "Lux",
1035            "Humidity", "Counter", "SignedCounter", "Float",
1036        ];
1037
1038        for i in 0..self.group_objects {
1039            let main = ((i / 256) + 1) as u8;
1040            let middle = ((i / 8) % 8) as u8;
1041            let sub = (i % 256) as u8;
1042            let addr = GroupAddress::three_level(main, middle, sub);
1043            let dpt_idx = i % dpt_types.len();
1044            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1045            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1046                tracing::warn!("Failed to create group object {}: {}", i, e);
1047            }
1048        }
1049
1050        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1051
1052        {
1053            let mut server_guard = self.server.lock().await;
1054            *server_guard = Some(server.clone());
1055        }
1056
1057        let server_clone = server.clone();
1058        let task = tokio::spawn(async move {
1059            if let Err(e) = server_clone.start().await {
1060                tracing::error!("KNX server error: {}", e);
1061            }
1062        });
1063
1064        {
1065            let mut task_guard = self.server_task.lock().await;
1066            *task_guard = Some(task);
1067        }
1068
1069        tokio::time::sleep(Duration::from_millis(100)).await;
1070
1071        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1072        Ok(())
1073    }
1074
1075    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1076        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1077        let server_opt = self.server.lock().await.take();
1078        if let Some(server) = server_opt {
1079            // Use spawn_blocking to handle the non-Send future
1080            let _ = tokio::task::spawn_blocking(move || {
1081                let rt = tokio::runtime::Handle::current();
1082                rt.block_on(async {
1083                    let _ = server.stop().await;
1084                })
1085            }).await;
1086        }
1087
1088        if let Some(task) = self.server_task.lock().await.take() {
1089            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1090        }
1091
1092        Ok(())
1093    }
1094}