Skip to main content

mabi_cli/commands/
protocol.rs

1//! Protocol-specific commands.
2//!
3//! Provides subcommands for each supported protocol.
4
5use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19// Protocol-specific imports
20use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25/// Base trait for protocol-specific commands.
26#[async_trait]
27pub trait ProtocolCommand: Command {
28    /// Get the protocol type.
29    fn protocol(&self) -> Protocol;
30
31    /// Get the default port.
32    fn default_port(&self) -> u16;
33
34    /// Start the protocol server.
35    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37    /// Stop the protocol server.
38    async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41// =============================================================================
42// Modbus Command
43// =============================================================================
44
45/// Modbus protocol command.
46pub struct ModbusCommand {
47    /// Binding address.
48    bind_addr: SocketAddr,
49    /// Number of devices to simulate.
50    devices: usize,
51    /// Points per device.
52    points_per_device: usize,
53    /// Use RTU mode instead of TCP.
54    rtu_mode: bool,
55    /// Serial port for RTU mode.
56    serial_port: Option<String>,
57    /// Device tags.
58    tags: Tags,
59    /// Server instance (for shutdown).
60    server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61    /// Server task handle.
62    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66    pub fn new() -> Self {
67        Self {
68            bind_addr: "0.0.0.0:502".parse().unwrap(),
69            devices: 1,
70            points_per_device: 100,
71            rtu_mode: false,
72            serial_port: None,
73            tags: Tags::new(),
74            server: Arc::new(Mutex::new(None)),
75            server_task: Arc::new(Mutex::new(None)),
76        }
77    }
78
79    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80        self.bind_addr = addr;
81        self
82    }
83
84    pub fn with_port(mut self, port: u16) -> Self {
85        self.bind_addr.set_port(port);
86        self
87    }
88
89    pub fn with_devices(mut self, devices: usize) -> Self {
90        self.devices = devices;
91        self
92    }
93
94    pub fn with_points(mut self, points: usize) -> Self {
95        self.points_per_device = points;
96        self
97    }
98
99    pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100        self.rtu_mode = true;
101        self.serial_port = Some(serial_port.into());
102        self
103    }
104
105    pub fn with_tags(mut self, tags: Tags) -> Self {
106        self.tags = tags;
107        self
108    }
109}
110
111impl Default for ModbusCommand {
112    fn default() -> Self {
113        Self::new()
114    }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119    fn name(&self) -> &str {
120        "modbus"
121    }
122
123    fn description(&self) -> &str {
124        "Start a Modbus TCP/RTU simulator"
125    }
126
127    fn requires_engine(&self) -> bool {
128        true
129    }
130
131    fn supports_shutdown(&self) -> bool {
132        true
133    }
134
135    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136        let format = ctx.output().format();
137        let is_quiet = ctx.is_quiet();
138        let is_verbose = ctx.is_verbose();
139        let is_debug = ctx.is_debug();
140
141        if !is_quiet {
142            if matches!(format, OutputFormat::Table) {
143                let output = ctx.output();
144                if self.rtu_mode {
145                    output.header("Modbus RTU Simulator");
146                    output.kv(
147                        "Serial Port",
148                        self.serial_port.as_deref().unwrap_or("N/A"),
149                    );
150                } else {
151                    output.header("Modbus TCP Simulator");
152                    output.kv("Bind Address", self.bind_addr);
153                }
154                output.kv("Devices", self.devices);
155                output.kv("Points per Device", self.points_per_device);
156                output.kv("Total Points", self.devices * self.points_per_device);
157            }
158        }
159
160        // Verbose: show extra configuration details
161        if is_verbose {
162            ctx.vprintln(format!("  Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163            ctx.vprintln(format!("  Points Distribution: {} per register type", self.points_per_device / 4));
164        }
165
166        // Debug: dump full configuration
167        if is_debug {
168            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169            ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170            ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171        }
172
173        self.start_server(ctx).await?;
174
175        let points_per_type = self.points_per_device / 4;
176
177        if !is_quiet {
178            match format {
179                OutputFormat::Table => {
180                    let colors_enabled = ctx.colors_enabled();
181                    let builder = TableBuilder::new(colors_enabled)
182                        .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184                    let devices = self.devices;
185                    let pts = points_per_type.to_string();
186                    let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187                        let unit_id = (i + 1).to_string();
188                        (
189                            vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190                            StatusType::Success,
191                        )
192                    });
193                    table.print();
194                }
195                _ => {
196                    #[derive(Serialize)]
197                    struct ModbusServerInfo {
198                        protocol: String,
199                        bind_address: String,
200                        devices: usize,
201                        points_per_device: usize,
202                        total_points: usize,
203                        rtu_mode: bool,
204                        serial_port: Option<String>,
205                        device_list: Vec<ModbusDeviceInfo>,
206                        status: String,
207                    }
208                    #[derive(Serialize)]
209                    struct ModbusDeviceInfo {
210                        unit_id: usize,
211                        holding_registers: usize,
212                        input_registers: usize,
213                        coils: usize,
214                        discrete_inputs: usize,
215                        status: String,
216                    }
217                    let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218                        .map(|i| ModbusDeviceInfo {
219                            unit_id: i + 1,
220                            holding_registers: points_per_type,
221                            input_registers: points_per_type,
222                            coils: points_per_type,
223                            discrete_inputs: points_per_type,
224                            status: "Online".into(),
225                        })
226                        .collect();
227                    let info = ModbusServerInfo {
228                        protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229                        bind_address: self.bind_addr.to_string(),
230                        devices: self.devices,
231                        points_per_device: self.points_per_device,
232                        total_points: self.devices * self.points_per_device,
233                        rtu_mode: self.rtu_mode,
234                        serial_port: self.serial_port.clone(),
235                        device_list,
236                        status: "Online".into(),
237                    };
238                    let _ = ctx.output().write(&info);
239                }
240            }
241        }
242
243        if !is_quiet {
244            ctx.output().info("Press Ctrl+C to stop");
245        }
246        ctx.shutdown_signal().notified().await;
247
248        self.stop_server(ctx).await?;
249        if !is_quiet {
250            ctx.output().success("Modbus simulator stopped");
251        }
252
253        Ok(CommandOutput::quiet_success())
254    }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259    fn protocol(&self) -> Protocol {
260        if self.rtu_mode {
261            Protocol::ModbusRtu
262        } else {
263            Protocol::ModbusTcp
264        }
265    }
266
267    fn default_port(&self) -> u16 {
268        502
269    }
270
271    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272        let output = ctx.output();
273        let spinner = output.spinner("Starting Modbus server...");
274
275        let config = ServerConfigV2 {
276            bind_address: self.bind_addr,
277            ..Default::default()
278        };
279
280        let server = Arc::new(ModbusTcpServerV2::new(config));
281
282        for i in 0..self.devices {
283            let unit_id = (i + 1) as u8;
284            let points = (self.points_per_device / 4) as u16;
285            let device_config = ModbusDeviceConfig {
286                unit_id,
287                name: format!("Device-{}", unit_id),
288                holding_registers: points,
289                input_registers: points,
290                coils: points,
291                discrete_inputs: points,
292                response_delay_ms: 0,
293                tags: self.tags.clone(),
294            };
295            let device = ModbusDevice::new(device_config);
296            server.add_device(device);
297        }
298
299        {
300            let mut server_guard = self.server.lock().await;
301            *server_guard = Some(server.clone());
302        }
303
304        let server_clone = server.clone();
305        let task = tokio::spawn(async move {
306            if let Err(e) = server_clone.run().await {
307                tracing::error!("Modbus server error: {}", e);
308            }
309        });
310
311        {
312            let mut task_guard = self.server_task.lock().await;
313            *task_guard = Some(task);
314        }
315
316        tokio::time::sleep(Duration::from_millis(100)).await;
317
318        spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319        Ok(())
320    }
321
322    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323        if let Some(server) = self.server.lock().await.as_ref() {
324            server.shutdown();
325        }
326
327        if let Some(task) = self.server_task.lock().await.take() {
328            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329        }
330
331        Ok(())
332    }
333}
334
335// =============================================================================
336// OPC UA Command
337// =============================================================================
338
339/// OPC UA protocol command.
340pub struct OpcuaCommand {
341    bind_addr: SocketAddr,
342    endpoint_path: String,
343    nodes: usize,
344    security_mode: String,
345    /// Device tags.
346    tags: Tags,
347    /// Server instance (for shutdown).
348    server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
349    /// Server task handle.
350    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
351}
352
353impl OpcuaCommand {
354    pub fn new() -> Self {
355        Self {
356            bind_addr: "0.0.0.0:4840".parse().unwrap(),
357            endpoint_path: "/".into(),
358            nodes: 1000,
359            security_mode: "None".into(),
360            tags: Tags::new(),
361            server: Arc::new(Mutex::new(None)),
362            server_task: Arc::new(Mutex::new(None)),
363        }
364    }
365
366    pub fn with_port(mut self, port: u16) -> Self {
367        self.bind_addr.set_port(port);
368        self
369    }
370
371    pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
372        self.endpoint_path = path.into();
373        self
374    }
375
376    pub fn with_nodes(mut self, nodes: usize) -> Self {
377        self.nodes = nodes;
378        self
379    }
380
381    pub fn with_security(mut self, mode: impl Into<String>) -> Self {
382        self.security_mode = mode.into();
383        self
384    }
385
386    pub fn with_tags(mut self, tags: Tags) -> Self {
387        self.tags = tags;
388        self
389    }
390}
391
392impl Default for OpcuaCommand {
393    fn default() -> Self {
394        Self::new()
395    }
396}
397
398#[async_trait]
399impl Command for OpcuaCommand {
400    fn name(&self) -> &str {
401        "opcua"
402    }
403
404    fn description(&self) -> &str {
405        "Start an OPC UA server simulator"
406    }
407
408    fn requires_engine(&self) -> bool {
409        true
410    }
411
412    fn supports_shutdown(&self) -> bool {
413        true
414    }
415
416    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
417        let format = ctx.output().format();
418        let is_quiet = ctx.is_quiet();
419        let is_verbose = ctx.is_verbose();
420        let is_debug = ctx.is_debug();
421
422        if !is_quiet {
423            if matches!(format, OutputFormat::Table) {
424                let output = ctx.output();
425                output.header("OPC UA Simulator");
426                output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
427                output.kv("Nodes", self.nodes);
428                output.kv("Security Mode", &self.security_mode);
429            }
430        }
431
432        // Verbose: show extra details
433        if is_verbose {
434            ctx.vprintln(format!("  Bind Address: {}", self.bind_addr));
435            ctx.vprintln(format!("  Endpoint Path: {}", self.endpoint_path));
436            ctx.vprintln(format!("  Max Subscriptions: 1000"));
437            ctx.vprintln(format!("  Max Monitored Items: 10000"));
438        }
439
440        // Debug: dump full configuration
441        if is_debug {
442            ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
443            ctx.dprintln(format!("Node count: {}", self.nodes));
444            ctx.dprintln(format!("Security mode: {}", self.security_mode));
445            ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
446        }
447
448        self.start_server(ctx).await?;
449
450        if !is_quiet {
451            match format {
452                OutputFormat::Table => {
453                    let colors_enabled = ctx.colors_enabled();
454                    let table = TableBuilder::new(colors_enabled)
455                        .header(["Namespace", "Nodes", "Subscriptions", "Status"])
456                        .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
457                        .status_row(
458                            ["1", &self.nodes.to_string(), "0", "Online"],
459                            StatusType::Success,
460                        );
461                    table.print();
462                }
463                _ => {
464                    #[derive(Serialize)]
465                    struct OpcuaServerInfo {
466                        protocol: String,
467                        endpoint: String,
468                        nodes: usize,
469                        security_mode: String,
470                        namespaces: Vec<NamespaceInfo>,
471                        status: String,
472                    }
473                    #[derive(Serialize)]
474                    struct NamespaceInfo {
475                        index: u32,
476                        nodes: String,
477                        subscriptions: u32,
478                        status: String,
479                    }
480                    let info = OpcuaServerInfo {
481                        protocol: "OPC UA".into(),
482                        endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
483                        nodes: self.nodes,
484                        security_mode: self.security_mode.clone(),
485                        namespaces: vec![
486                            NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
487                            NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
488                        ],
489                        status: "Online".into(),
490                    };
491                    let _ = ctx.output().write(&info);
492                }
493            }
494        }
495
496        if !is_quiet {
497            ctx.output().info("Press Ctrl+C to stop");
498        }
499        ctx.shutdown_signal().notified().await;
500
501        self.stop_server(ctx).await?;
502        if !is_quiet {
503            ctx.output().success("OPC UA simulator stopped");
504        }
505
506        Ok(CommandOutput::quiet_success())
507    }
508}
509
510#[async_trait]
511impl ProtocolCommand for OpcuaCommand {
512    fn protocol(&self) -> Protocol {
513        Protocol::OpcUa
514    }
515
516    fn default_port(&self) -> u16 {
517        4840
518    }
519
520    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
521        let output = ctx.output();
522        let spinner = output.spinner("Starting OPC UA server...");
523
524        let config = OpcUaServerConfig {
525            endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
526            server_name: "Mabinogion OPC UA Simulator".to_string(),
527            max_subscriptions: 1000,
528            max_monitored_items: 10000,
529            ..Default::default()
530        };
531
532        let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
533            crate::error::CliError::ExecutionFailed {
534                message: format!("Failed to create OPC UA server: {}", e)
535            }
536        })?);
537
538        // Add sample nodes with diverse types and mixed read-only / writable access.
539        // Even-indexed nodes are writable, odd-indexed are read-only.
540        let node_count = self.nodes.min(100);
541        for i in 0..node_count {
542            let node_id = format!("ns=2;i={}", 1000 + i);
543            let name = format!("Variable_{}", i);
544            let value = (i as f64) * 0.1;
545
546            if i % 2 == 0 {
547                let _ = server.add_writable_variable(node_id, name, value);
548            } else {
549                let _ = server.add_variable(node_id, name, value);
550            }
551        }
552
553        {
554            let mut server_guard = self.server.lock().await;
555            *server_guard = Some(server.clone());
556        }
557
558        let server_clone = server.clone();
559        let task = tokio::spawn(async move {
560            if let Err(e) = server_clone.start().await {
561                tracing::error!("OPC UA server error: {}", e);
562            }
563        });
564
565        {
566            let mut task_guard = self.server_task.lock().await;
567            *task_guard = Some(task);
568        }
569
570        tokio::time::sleep(Duration::from_millis(100)).await;
571
572        spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
573        Ok(())
574    }
575
576    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
577        if let Some(server) = self.server.lock().await.as_ref() {
578            let _ = server.stop().await;
579        }
580
581        if let Some(task) = self.server_task.lock().await.take() {
582            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
583        }
584
585        Ok(())
586    }
587}
588
589// =============================================================================
590// BACnet Command
591// =============================================================================
592
593/// BACnet protocol command.
594pub struct BacnetCommand {
595    bind_addr: SocketAddr,
596    device_instance: u32,
597    objects: usize,
598    bbmd_enabled: bool,
599    /// Device tags.
600    tags: Tags,
601    /// Server instance (for shutdown).
602    server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
603    /// Server task handle.
604    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
605}
606
607impl BacnetCommand {
608    pub fn new() -> Self {
609        Self {
610            bind_addr: "0.0.0.0:47808".parse().unwrap(),
611            device_instance: 1234,
612            objects: 100,
613            bbmd_enabled: false,
614            tags: Tags::new(),
615            server: Arc::new(Mutex::new(None)),
616            server_task: Arc::new(Mutex::new(None)),
617        }
618    }
619
620    pub fn with_port(mut self, port: u16) -> Self {
621        self.bind_addr.set_port(port);
622        self
623    }
624
625    pub fn with_device_instance(mut self, instance: u32) -> Self {
626        self.device_instance = instance;
627        self
628    }
629
630    pub fn with_objects(mut self, objects: usize) -> Self {
631        self.objects = objects;
632        self
633    }
634
635    pub fn with_bbmd(mut self, enabled: bool) -> Self {
636        self.bbmd_enabled = enabled;
637        self
638    }
639
640    pub fn with_tags(mut self, tags: Tags) -> Self {
641        self.tags = tags;
642        self
643    }
644}
645
646impl Default for BacnetCommand {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652#[async_trait]
653impl Command for BacnetCommand {
654    fn name(&self) -> &str {
655        "bacnet"
656    }
657
658    fn description(&self) -> &str {
659        "Start a BACnet/IP simulator"
660    }
661
662    fn requires_engine(&self) -> bool {
663        true
664    }
665
666    fn supports_shutdown(&self) -> bool {
667        true
668    }
669
670    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671        let format = ctx.output().format();
672        let is_quiet = ctx.is_quiet();
673        let is_verbose = ctx.is_verbose();
674        let is_debug = ctx.is_debug();
675
676        if !is_quiet {
677            if matches!(format, OutputFormat::Table) {
678                let output = ctx.output();
679                output.header("BACnet/IP Simulator");
680                output.kv("Bind Address", self.bind_addr);
681                output.kv("Device Instance", self.device_instance);
682                output.kv("Objects", self.objects);
683                output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
684            }
685        }
686
687        // Verbose: show extra details
688        if is_verbose {
689            let per_type = self.objects / 4;
690            ctx.vprintln(format!("  Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
691            ctx.vprintln(format!("  Device Name: Mabinogion BACnet Simulator"));
692        }
693
694        // Debug: dump full configuration
695        if is_debug {
696            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
697            ctx.dprintln(format!("Device instance: {}", self.device_instance));
698            ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
699        }
700
701        self.start_server(ctx).await?;
702
703        let per_type = self.objects / 4;
704
705        if !is_quiet {
706            match format {
707                OutputFormat::Table => {
708                    let colors_enabled = ctx.colors_enabled();
709                    let table = TableBuilder::new(colors_enabled)
710                        .header(["Object Type", "Count", "Status"])
711                        .status_row(["Device", "1", "Online"], StatusType::Success)
712                        .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
713                        .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
714                        .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
715                        .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
716                    table.print();
717                }
718                _ => {
719                    #[derive(Serialize)]
720                    struct BacnetServerInfo {
721                        protocol: String,
722                        bind_address: String,
723                        device_instance: u32,
724                        objects: usize,
725                        bbmd_enabled: bool,
726                        object_types: Vec<ObjectTypeInfo>,
727                        status: String,
728                    }
729                    #[derive(Serialize)]
730                    struct ObjectTypeInfo {
731                        object_type: String,
732                        count: usize,
733                        status: String,
734                    }
735                    let info = BacnetServerInfo {
736                        protocol: "BACnet/IP".into(),
737                        bind_address: self.bind_addr.to_string(),
738                        device_instance: self.device_instance,
739                        objects: self.objects,
740                        bbmd_enabled: self.bbmd_enabled,
741                        object_types: vec![
742                            ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
743                            ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
744                            ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
745                            ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
746                            ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
747                        ],
748                        status: "Online".into(),
749                    };
750                    let _ = ctx.output().write(&info);
751                }
752            }
753        }
754
755        if !is_quiet {
756            ctx.output().info("Press Ctrl+C to stop");
757        }
758        ctx.shutdown_signal().notified().await;
759
760        self.stop_server(ctx).await?;
761        if !is_quiet {
762            ctx.output().success("BACnet simulator stopped");
763        }
764
765        Ok(CommandOutput::quiet_success())
766    }
767}
768
769#[async_trait]
770impl ProtocolCommand for BacnetCommand {
771    fn protocol(&self) -> Protocol {
772        Protocol::BacnetIp
773    }
774
775    fn default_port(&self) -> u16 {
776        47808
777    }
778
779    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
780        let output = ctx.output();
781        let spinner = output.spinner("Starting BACnet server...");
782
783        let config = BacnetServerConfig::new(self.device_instance)
784            .with_bind_addr(self.bind_addr)
785            .with_device_name("Mabinogion BACnet Simulator");
786
787        // Create object registry with sample objects
788        let registry = ObjectRegistry::new();
789
790        let objects_per_type = self.objects / 4;
791        for i in 0..objects_per_type {
792            let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
793            registry.register(Arc::new(ai));
794        }
795        for i in 0..objects_per_type {
796            let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
797            registry.register(Arc::new(ao));
798        }
799        for i in 0..objects_per_type {
800            let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
801            registry.register(Arc::new(bi));
802        }
803        for i in 0..objects_per_type {
804            let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
805            registry.register(Arc::new(bo));
806        }
807
808        let server = Arc::new(BACnetServer::new(config, registry));
809
810        {
811            let mut server_guard = self.server.lock().await;
812            *server_guard = Some(server.clone());
813        }
814
815        let server_clone = server.clone();
816        let task = tokio::spawn(async move {
817            if let Err(e) = server_clone.run().await {
818                tracing::error!("BACnet server error: {}", e);
819            }
820        });
821
822        {
823            let mut task_guard = self.server_task.lock().await;
824            *task_guard = Some(task);
825        }
826
827        tokio::time::sleep(Duration::from_millis(100)).await;
828
829        spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
830        Ok(())
831    }
832
833    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
834        if let Some(server) = self.server.lock().await.as_ref() {
835            server.shutdown();
836        }
837
838        if let Some(task) = self.server_task.lock().await.take() {
839            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
840        }
841
842        Ok(())
843    }
844}
845
846// =============================================================================
847// KNX Command
848// =============================================================================
849
850/// KNX protocol command.
851pub struct KnxCommand {
852    bind_addr: SocketAddr,
853    individual_address: String,
854    group_objects: usize,
855    /// Device tags.
856    tags: Tags,
857    /// Server instance (for shutdown).
858    server: Arc<Mutex<Option<Arc<KnxServer>>>>,
859    /// Server task handle.
860    server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
861}
862
863impl KnxCommand {
864    pub fn new() -> Self {
865        Self {
866            bind_addr: "0.0.0.0:3671".parse().unwrap(),
867            individual_address: "1.1.1".into(),
868            group_objects: 100,
869            tags: Tags::new(),
870            server: Arc::new(Mutex::new(None)),
871            server_task: Arc::new(Mutex::new(None)),
872        }
873    }
874
875    pub fn with_port(mut self, port: u16) -> Self {
876        self.bind_addr.set_port(port);
877        self
878    }
879
880    pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
881        self.individual_address = addr.into();
882        self
883    }
884
885    pub fn with_group_objects(mut self, count: usize) -> Self {
886        self.group_objects = count;
887        self
888    }
889
890    pub fn with_tags(mut self, tags: Tags) -> Self {
891        self.tags = tags;
892        self
893    }
894}
895
896impl Default for KnxCommand {
897    fn default() -> Self {
898        Self::new()
899    }
900}
901
902#[async_trait]
903impl Command for KnxCommand {
904    fn name(&self) -> &str {
905        "knx"
906    }
907
908    fn description(&self) -> &str {
909        "Start a KNXnet/IP simulator"
910    }
911
912    fn requires_engine(&self) -> bool {
913        true
914    }
915
916    fn supports_shutdown(&self) -> bool {
917        true
918    }
919
920    async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
921        let format = ctx.output().format();
922        let is_quiet = ctx.is_quiet();
923        let is_verbose = ctx.is_verbose();
924        let is_debug = ctx.is_debug();
925
926        if !is_quiet {
927            if matches!(format, OutputFormat::Table) {
928                let output = ctx.output();
929                output.header("KNXnet/IP Simulator");
930                output.kv("Bind Address", self.bind_addr);
931                output.kv("Individual Address", &self.individual_address);
932                output.kv("Group Objects", self.group_objects);
933            }
934        }
935
936        // Verbose: show extra details
937        if is_verbose {
938            ctx.vprintln(format!("  Max Connections: 10"));
939            ctx.vprintln(format!("  Services: Core, Device Management, Tunneling"));
940        }
941
942        // Debug: dump full configuration
943        if is_debug {
944            ctx.dprintln(format!("Bind address: {}", self.bind_addr));
945            ctx.dprintln(format!("Individual address: {}", self.individual_address));
946            ctx.dprintln(format!("Group objects: {}", self.group_objects));
947        }
948
949        self.start_server(ctx).await?;
950
951        if !is_quiet {
952            match format {
953                OutputFormat::Table => {
954                    let colors_enabled = ctx.colors_enabled();
955                    let table = TableBuilder::new(colors_enabled)
956                        .header(["Service", "Status"])
957                        .status_row(["Core", "Ready"], StatusType::Success)
958                        .status_row(["Device Management", "Ready"], StatusType::Success)
959                        .status_row(["Tunneling", "Ready"], StatusType::Success);
960                    table.print();
961                }
962                _ => {
963                    #[derive(Serialize)]
964                    struct KnxServerInfo {
965                        protocol: String,
966                        bind_address: String,
967                        individual_address: String,
968                        group_objects: usize,
969                        services: Vec<ServiceInfo>,
970                        status: String,
971                    }
972                    #[derive(Serialize)]
973                    struct ServiceInfo {
974                        service: String,
975                        status: String,
976                    }
977                    let info = KnxServerInfo {
978                        protocol: "KNXnet/IP".into(),
979                        bind_address: self.bind_addr.to_string(),
980                        individual_address: self.individual_address.clone(),
981                        group_objects: self.group_objects,
982                        services: vec![
983                            ServiceInfo { service: "Core".into(), status: "Ready".into() },
984                            ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
985                            ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
986                        ],
987                        status: "Online".into(),
988                    };
989                    let _ = ctx.output().write(&info);
990                }
991            }
992        }
993
994        if !is_quiet {
995            ctx.output().info("Press Ctrl+C to stop");
996        }
997        ctx.shutdown_signal().notified().await;
998
999        self.stop_server(ctx).await?;
1000        if !is_quiet {
1001            ctx.output().success("KNX simulator stopped");
1002        }
1003
1004        Ok(CommandOutput::quiet_success())
1005    }
1006}
1007
1008#[async_trait]
1009impl ProtocolCommand for KnxCommand {
1010    fn protocol(&self) -> Protocol {
1011        Protocol::KnxIp
1012    }
1013
1014    fn default_port(&self) -> u16 {
1015        3671
1016    }
1017
1018    async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1019        let output = ctx.output();
1020        let spinner = output.spinner("Starting KNX server...");
1021
1022        // Parse individual address
1023        let individual_address: IndividualAddress = self.individual_address.parse()
1024            .map_err(|_| crate::error::CliError::ExecutionFailed {
1025                message: format!("Invalid individual address: {}", self.individual_address)
1026            })?;
1027
1028        let config = KnxServerConfig {
1029            bind_addr: self.bind_addr,
1030            individual_address,
1031            max_connections: 256,
1032            ..Default::default()
1033        };
1034
1035        // Create group objects based on --groups parameter
1036        let group_table = Arc::new(GroupObjectTable::new());
1037        let dpt_types = [
1038            DptId::new(1, 1),   // Switch (bool)
1039            DptId::new(5, 1),   // Scaling (0-100%)
1040            DptId::new(9, 1),   // Temperature (float16)
1041            DptId::new(9, 4),   // Lux
1042            DptId::new(9, 7),   // Humidity
1043            DptId::new(12, 1),  // Counter (u32)
1044            DptId::new(13, 1),  // Counter signed (i32)
1045            DptId::new(14, 56), // Float (f32)
1046        ];
1047        let dpt_names = [
1048            "Switch", "Scaling", "Temperature", "Lux",
1049            "Humidity", "Counter", "SignedCounter", "Float",
1050        ];
1051
1052        for i in 0..self.group_objects {
1053            let main = ((i / 256) + 1) as u8;
1054            let middle = ((i / 8) % 8) as u8;
1055            let sub = (i % 256) as u8;
1056            let addr = GroupAddress::three_level(main, middle, sub);
1057            let dpt_idx = i % dpt_types.len();
1058            let name = format!("{}_{}", dpt_names[dpt_idx], i);
1059            if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1060                tracing::warn!("Failed to create group object {}: {}", i, e);
1061            }
1062        }
1063
1064        let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1065
1066        {
1067            let mut server_guard = self.server.lock().await;
1068            *server_guard = Some(server.clone());
1069        }
1070
1071        let server_clone = server.clone();
1072        let task = tokio::spawn(async move {
1073            if let Err(e) = server_clone.start().await {
1074                tracing::error!("KNX server error: {}", e);
1075            }
1076        });
1077
1078        {
1079            let mut task_guard = self.server_task.lock().await;
1080            *task_guard = Some(task);
1081        }
1082
1083        tokio::time::sleep(Duration::from_millis(100)).await;
1084
1085        spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1086        Ok(())
1087    }
1088
1089    async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1090        // Take server out to call stop (KnxServer::stop has Send issues with parking_lot)
1091        let server_opt = self.server.lock().await.take();
1092        if let Some(server) = server_opt {
1093            // Use spawn_blocking to handle the non-Send future
1094            let _ = tokio::task::spawn_blocking(move || {
1095                let rt = tokio::runtime::Handle::current();
1096                rt.block_on(async {
1097                    let _ = server.stop().await;
1098                })
1099            }).await;
1100        }
1101
1102        if let Some(task) = self.server_task.lock().await.take() {
1103            let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1104        }
1105
1106        Ok(())
1107    }
1108}