1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, default_object_descriptors};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25#[async_trait]
27pub trait ProtocolCommand: Command {
28 fn protocol(&self) -> Protocol;
30
31 fn default_port(&self) -> u16;
33
34 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41pub struct ModbusCommand {
47 bind_addr: SocketAddr,
49 devices: usize,
51 points_per_device: usize,
53 rtu_mode: bool,
55 serial_port: Option<String>,
57 tags: Tags,
59 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66 pub fn new() -> Self {
67 Self {
68 bind_addr: "0.0.0.0:502".parse().unwrap(),
69 devices: 1,
70 points_per_device: 100,
71 rtu_mode: false,
72 serial_port: None,
73 tags: Tags::new(),
74 server: Arc::new(Mutex::new(None)),
75 server_task: Arc::new(Mutex::new(None)),
76 }
77 }
78
79 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80 self.bind_addr = addr;
81 self
82 }
83
84 pub fn with_port(mut self, port: u16) -> Self {
85 self.bind_addr.set_port(port);
86 self
87 }
88
89 pub fn with_devices(mut self, devices: usize) -> Self {
90 self.devices = devices;
91 self
92 }
93
94 pub fn with_points(mut self, points: usize) -> Self {
95 self.points_per_device = points;
96 self
97 }
98
99 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100 self.rtu_mode = true;
101 self.serial_port = Some(serial_port.into());
102 self
103 }
104
105 pub fn with_tags(mut self, tags: Tags) -> Self {
106 self.tags = tags;
107 self
108 }
109}
110
111impl Default for ModbusCommand {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119 fn name(&self) -> &str {
120 "modbus"
121 }
122
123 fn description(&self) -> &str {
124 "Start a Modbus TCP/RTU simulator"
125 }
126
127 fn requires_engine(&self) -> bool {
128 true
129 }
130
131 fn supports_shutdown(&self) -> bool {
132 true
133 }
134
135 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136 let format = ctx.output().format();
137 let is_quiet = ctx.is_quiet();
138 let is_verbose = ctx.is_verbose();
139 let is_debug = ctx.is_debug();
140
141 if !is_quiet {
142 if matches!(format, OutputFormat::Table) {
143 let output = ctx.output();
144 if self.rtu_mode {
145 output.header("Modbus RTU Simulator");
146 output.kv(
147 "Serial Port",
148 self.serial_port.as_deref().unwrap_or("N/A"),
149 );
150 } else {
151 output.header("Modbus TCP Simulator");
152 output.kv("Bind Address", self.bind_addr);
153 }
154 output.kv("Devices", self.devices);
155 output.kv("Points per Device", self.points_per_device);
156 output.kv("Total Points", self.devices * self.points_per_device);
157 }
158 }
159
160 if is_verbose {
162 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
164 }
165
166 if is_debug {
168 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171 }
172
173 self.start_server(ctx).await?;
174
175 let points_per_type = self.points_per_device / 4;
176
177 if !is_quiet {
178 match format {
179 OutputFormat::Table => {
180 let colors_enabled = ctx.colors_enabled();
181 let builder = TableBuilder::new(colors_enabled)
182 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184 let devices = self.devices;
185 let pts = points_per_type.to_string();
186 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187 let unit_id = (i + 1).to_string();
188 (
189 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190 StatusType::Success,
191 )
192 });
193 table.print();
194 }
195 _ => {
196 #[derive(Serialize)]
197 struct ModbusServerInfo {
198 protocol: String,
199 bind_address: String,
200 devices: usize,
201 points_per_device: usize,
202 total_points: usize,
203 rtu_mode: bool,
204 serial_port: Option<String>,
205 device_list: Vec<ModbusDeviceInfo>,
206 status: String,
207 }
208 #[derive(Serialize)]
209 struct ModbusDeviceInfo {
210 unit_id: usize,
211 holding_registers: usize,
212 input_registers: usize,
213 coils: usize,
214 discrete_inputs: usize,
215 status: String,
216 }
217 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218 .map(|i| ModbusDeviceInfo {
219 unit_id: i + 1,
220 holding_registers: points_per_type,
221 input_registers: points_per_type,
222 coils: points_per_type,
223 discrete_inputs: points_per_type,
224 status: "Online".into(),
225 })
226 .collect();
227 let info = ModbusServerInfo {
228 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229 bind_address: self.bind_addr.to_string(),
230 devices: self.devices,
231 points_per_device: self.points_per_device,
232 total_points: self.devices * self.points_per_device,
233 rtu_mode: self.rtu_mode,
234 serial_port: self.serial_port.clone(),
235 device_list,
236 status: "Online".into(),
237 };
238 let _ = ctx.output().write(&info);
239 }
240 }
241 }
242
243 if !is_quiet {
244 ctx.output().info("Press Ctrl+C to stop");
245 }
246 ctx.shutdown_signal().notified().await;
247
248 self.stop_server(ctx).await?;
249 if !is_quiet {
250 ctx.output().success("Modbus simulator stopped");
251 }
252
253 Ok(CommandOutput::quiet_success())
254 }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259 fn protocol(&self) -> Protocol {
260 if self.rtu_mode {
261 Protocol::ModbusRtu
262 } else {
263 Protocol::ModbusTcp
264 }
265 }
266
267 fn default_port(&self) -> u16 {
268 502
269 }
270
271 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272 let output = ctx.output();
273 let spinner = output.spinner("Starting Modbus server...");
274
275 let config = ServerConfigV2 {
276 bind_address: self.bind_addr,
277 ..Default::default()
278 };
279
280 let server = Arc::new(ModbusTcpServerV2::new(config));
281
282 for i in 0..self.devices {
283 let unit_id = (i + 1) as u8;
284 let points = (self.points_per_device / 4) as u16;
285 let device_config = ModbusDeviceConfig {
286 unit_id,
287 name: format!("Device-{}", unit_id),
288 holding_registers: points,
289 input_registers: points,
290 coils: points,
291 discrete_inputs: points,
292 response_delay_ms: 0,
293 tags: self.tags.clone(),
294 };
295 let device = ModbusDevice::new(device_config);
296 server.add_device(device);
297 }
298
299 {
300 let mut server_guard = self.server.lock().await;
301 *server_guard = Some(server.clone());
302 }
303
304 let server_clone = server.clone();
305 let task = tokio::spawn(async move {
306 if let Err(e) = server_clone.run().await {
307 tracing::error!("Modbus server error: {}", e);
308 }
309 });
310
311 {
312 let mut task_guard = self.server_task.lock().await;
313 *task_guard = Some(task);
314 }
315
316 tokio::time::sleep(Duration::from_millis(100)).await;
317
318 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319 Ok(())
320 }
321
322 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323 if let Some(server) = self.server.lock().await.as_ref() {
324 server.shutdown();
325 }
326
327 if let Some(task) = self.server_task.lock().await.take() {
328 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329 }
330
331 Ok(())
332 }
333}
334
335pub struct OpcuaCommand {
341 bind_addr: SocketAddr,
342 endpoint_path: String,
343 nodes: usize,
344 security_mode: String,
345 tags: Tags,
347 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
349 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
351}
352
353impl OpcuaCommand {
354 pub fn new() -> Self {
355 Self {
356 bind_addr: "0.0.0.0:4840".parse().unwrap(),
357 endpoint_path: "/".into(),
358 nodes: 1000,
359 security_mode: "None".into(),
360 tags: Tags::new(),
361 server: Arc::new(Mutex::new(None)),
362 server_task: Arc::new(Mutex::new(None)),
363 }
364 }
365
366 pub fn with_port(mut self, port: u16) -> Self {
367 self.bind_addr.set_port(port);
368 self
369 }
370
371 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
372 self.endpoint_path = path.into();
373 self
374 }
375
376 pub fn with_nodes(mut self, nodes: usize) -> Self {
377 self.nodes = nodes;
378 self
379 }
380
381 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
382 self.security_mode = mode.into();
383 self
384 }
385
386 pub fn with_tags(mut self, tags: Tags) -> Self {
387 self.tags = tags;
388 self
389 }
390}
391
392impl Default for OpcuaCommand {
393 fn default() -> Self {
394 Self::new()
395 }
396}
397
398#[async_trait]
399impl Command for OpcuaCommand {
400 fn name(&self) -> &str {
401 "opcua"
402 }
403
404 fn description(&self) -> &str {
405 "Start an OPC UA server simulator"
406 }
407
408 fn requires_engine(&self) -> bool {
409 true
410 }
411
412 fn supports_shutdown(&self) -> bool {
413 true
414 }
415
416 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
417 let format = ctx.output().format();
418 let is_quiet = ctx.is_quiet();
419 let is_verbose = ctx.is_verbose();
420 let is_debug = ctx.is_debug();
421
422 if !is_quiet {
423 if matches!(format, OutputFormat::Table) {
424 let output = ctx.output();
425 output.header("OPC UA Simulator");
426 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
427 output.kv("Nodes", self.nodes);
428 output.kv("Security Mode", &self.security_mode);
429 }
430 }
431
432 if is_verbose {
434 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
435 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
436 ctx.vprintln(format!(" Max Subscriptions: 1000"));
437 ctx.vprintln(format!(" Max Monitored Items: 10000"));
438 }
439
440 if is_debug {
442 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
443 ctx.dprintln(format!("Node count: {}", self.nodes));
444 ctx.dprintln(format!("Security mode: {}", self.security_mode));
445 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
446 }
447
448 self.start_server(ctx).await?;
449
450 if !is_quiet {
451 match format {
452 OutputFormat::Table => {
453 let colors_enabled = ctx.colors_enabled();
454 let table = TableBuilder::new(colors_enabled)
455 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
456 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
457 .status_row(
458 ["1", &self.nodes.to_string(), "0", "Online"],
459 StatusType::Success,
460 );
461 table.print();
462 }
463 _ => {
464 #[derive(Serialize)]
465 struct OpcuaServerInfo {
466 protocol: String,
467 endpoint: String,
468 nodes: usize,
469 security_mode: String,
470 namespaces: Vec<NamespaceInfo>,
471 status: String,
472 }
473 #[derive(Serialize)]
474 struct NamespaceInfo {
475 index: u32,
476 nodes: String,
477 subscriptions: u32,
478 status: String,
479 }
480 let info = OpcuaServerInfo {
481 protocol: "OPC UA".into(),
482 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
483 nodes: self.nodes,
484 security_mode: self.security_mode.clone(),
485 namespaces: vec![
486 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
487 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
488 ],
489 status: "Online".into(),
490 };
491 let _ = ctx.output().write(&info);
492 }
493 }
494 }
495
496 if !is_quiet {
497 ctx.output().info("Press Ctrl+C to stop");
498 }
499 ctx.shutdown_signal().notified().await;
500
501 self.stop_server(ctx).await?;
502 if !is_quiet {
503 ctx.output().success("OPC UA simulator stopped");
504 }
505
506 Ok(CommandOutput::quiet_success())
507 }
508}
509
510#[async_trait]
511impl ProtocolCommand for OpcuaCommand {
512 fn protocol(&self) -> Protocol {
513 Protocol::OpcUa
514 }
515
516 fn default_port(&self) -> u16 {
517 4840
518 }
519
520 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
521 let output = ctx.output();
522 let spinner = output.spinner("Starting OPC UA server...");
523
524 let config = OpcUaServerConfig {
525 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
526 server_name: "Mabinogion OPC UA Simulator".to_string(),
527 max_subscriptions: 1000,
528 max_monitored_items: 10000,
529 ..Default::default()
530 };
531
532 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
533 crate::error::CliError::ExecutionFailed {
534 message: format!("Failed to create OPC UA server: {}", e)
535 }
536 })?);
537
538 let node_count = self.nodes.min(100);
541 for i in 0..node_count {
542 let node_id = format!("ns=2;i={}", 1000 + i);
543 let name = format!("Variable_{}", i);
544 let value = (i as f64) * 0.1;
545
546 if i % 2 == 0 {
547 let _ = server.add_writable_variable(node_id, name, value);
548 } else {
549 let _ = server.add_variable(node_id, name, value);
550 }
551 }
552
553 {
554 let mut server_guard = self.server.lock().await;
555 *server_guard = Some(server.clone());
556 }
557
558 let server_clone = server.clone();
559 let task = tokio::spawn(async move {
560 if let Err(e) = server_clone.start().await {
561 tracing::error!("OPC UA server error: {}", e);
562 }
563 });
564
565 {
566 let mut task_guard = self.server_task.lock().await;
567 *task_guard = Some(task);
568 }
569
570 tokio::time::sleep(Duration::from_millis(100)).await;
571
572 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
573 Ok(())
574 }
575
576 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
577 if let Some(server) = self.server.lock().await.as_ref() {
578 let _ = server.stop().await;
579 }
580
581 if let Some(task) = self.server_task.lock().await.take() {
582 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
583 }
584
585 Ok(())
586 }
587}
588
589pub struct BacnetCommand {
595 bind_addr: SocketAddr,
596 device_instance: u32,
597 objects: usize,
598 bbmd_enabled: bool,
599 tags: Tags,
601 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
603 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
605}
606
607impl BacnetCommand {
608 pub fn new() -> Self {
609 Self {
610 bind_addr: "0.0.0.0:47808".parse().unwrap(),
611 device_instance: 1234,
612 objects: 100,
613 bbmd_enabled: false,
614 tags: Tags::new(),
615 server: Arc::new(Mutex::new(None)),
616 server_task: Arc::new(Mutex::new(None)),
617 }
618 }
619
620 pub fn with_port(mut self, port: u16) -> Self {
621 self.bind_addr.set_port(port);
622 self
623 }
624
625 pub fn with_device_instance(mut self, instance: u32) -> Self {
626 self.device_instance = instance;
627 self
628 }
629
630 pub fn with_objects(mut self, objects: usize) -> Self {
631 self.objects = objects;
632 self
633 }
634
635 pub fn with_bbmd(mut self, enabled: bool) -> Self {
636 self.bbmd_enabled = enabled;
637 self
638 }
639
640 pub fn with_tags(mut self, tags: Tags) -> Self {
641 self.tags = tags;
642 self
643 }
644}
645
646impl Default for BacnetCommand {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652#[async_trait]
653impl Command for BacnetCommand {
654 fn name(&self) -> &str {
655 "bacnet"
656 }
657
658 fn description(&self) -> &str {
659 "Start a BACnet/IP simulator"
660 }
661
662 fn requires_engine(&self) -> bool {
663 true
664 }
665
666 fn supports_shutdown(&self) -> bool {
667 true
668 }
669
670 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671 let format = ctx.output().format();
672 let is_quiet = ctx.is_quiet();
673 let is_verbose = ctx.is_verbose();
674 let is_debug = ctx.is_debug();
675
676 if !is_quiet {
677 if matches!(format, OutputFormat::Table) {
678 let output = ctx.output();
679 output.header("BACnet/IP Simulator");
680 output.kv("Bind Address", self.bind_addr);
681 output.kv("Device Instance", self.device_instance);
682 output.kv("Objects", self.objects);
683 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
684 }
685 }
686
687 if is_verbose {
689 let per_type = self.objects / 4;
690 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
691 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
692 }
693
694 if is_debug {
696 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
697 ctx.dprintln(format!("Device instance: {}", self.device_instance));
698 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
699 }
700
701 self.start_server(ctx).await?;
702
703 let per_type = self.objects / 4;
704
705 if !is_quiet {
706 match format {
707 OutputFormat::Table => {
708 let colors_enabled = ctx.colors_enabled();
709 let table = TableBuilder::new(colors_enabled)
710 .header(["Object Type", "Count", "Status"])
711 .status_row(["Device", "1", "Online"], StatusType::Success)
712 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
713 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
714 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
715 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
716 table.print();
717 }
718 _ => {
719 #[derive(Serialize)]
720 struct BacnetServerInfo {
721 protocol: String,
722 bind_address: String,
723 device_instance: u32,
724 objects: usize,
725 bbmd_enabled: bool,
726 object_types: Vec<ObjectTypeInfo>,
727 status: String,
728 }
729 #[derive(Serialize)]
730 struct ObjectTypeInfo {
731 object_type: String,
732 count: usize,
733 status: String,
734 }
735 let info = BacnetServerInfo {
736 protocol: "BACnet/IP".into(),
737 bind_address: self.bind_addr.to_string(),
738 device_instance: self.device_instance,
739 objects: self.objects,
740 bbmd_enabled: self.bbmd_enabled,
741 object_types: vec![
742 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
743 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
744 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
745 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
746 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
747 ],
748 status: "Online".into(),
749 };
750 let _ = ctx.output().write(&info);
751 }
752 }
753 }
754
755 if !is_quiet {
756 ctx.output().info("Press Ctrl+C to stop");
757 }
758 ctx.shutdown_signal().notified().await;
759
760 self.stop_server(ctx).await?;
761 if !is_quiet {
762 ctx.output().success("BACnet simulator stopped");
763 }
764
765 Ok(CommandOutput::quiet_success())
766 }
767}
768
769#[async_trait]
770impl ProtocolCommand for BacnetCommand {
771 fn protocol(&self) -> Protocol {
772 Protocol::BacnetIp
773 }
774
775 fn default_port(&self) -> u16 {
776 47808
777 }
778
779 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
780 let output = ctx.output();
781 let spinner = output.spinner("Starting BACnet server...");
782
783 let config = BacnetServerConfig::new(self.device_instance)
784 .with_bind_addr(self.bind_addr)
785 .with_device_name("Mabinogion BACnet Simulator");
786
787 let registry = ObjectRegistry::new();
789
790 let descriptors = default_object_descriptors();
791 let objects_per_type = self.objects / descriptors.len();
792 registry.populate_standard_objects(&descriptors, objects_per_type);
793
794 let server = Arc::new(BACnetServer::new(config, registry));
795
796 {
797 let mut server_guard = self.server.lock().await;
798 *server_guard = Some(server.clone());
799 }
800
801 let server_clone = server.clone();
802 let task = tokio::spawn(async move {
803 if let Err(e) = server_clone.run().await {
804 tracing::error!("BACnet server error: {}", e);
805 }
806 });
807
808 {
809 let mut task_guard = self.server_task.lock().await;
810 *task_guard = Some(task);
811 }
812
813 tokio::time::sleep(Duration::from_millis(100)).await;
814
815 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
816 Ok(())
817 }
818
819 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
820 if let Some(server) = self.server.lock().await.as_ref() {
821 server.shutdown();
822 }
823
824 if let Some(task) = self.server_task.lock().await.take() {
825 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
826 }
827
828 Ok(())
829 }
830}
831
832pub struct KnxCommand {
838 bind_addr: SocketAddr,
839 individual_address: String,
840 group_objects: usize,
841 tags: Tags,
843 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
845 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
847}
848
849impl KnxCommand {
850 pub fn new() -> Self {
851 Self {
852 bind_addr: "0.0.0.0:3671".parse().unwrap(),
853 individual_address: "1.1.1".into(),
854 group_objects: 100,
855 tags: Tags::new(),
856 server: Arc::new(Mutex::new(None)),
857 server_task: Arc::new(Mutex::new(None)),
858 }
859 }
860
861 pub fn with_port(mut self, port: u16) -> Self {
862 self.bind_addr.set_port(port);
863 self
864 }
865
866 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
867 self.individual_address = addr.into();
868 self
869 }
870
871 pub fn with_group_objects(mut self, count: usize) -> Self {
872 self.group_objects = count;
873 self
874 }
875
876 pub fn with_tags(mut self, tags: Tags) -> Self {
877 self.tags = tags;
878 self
879 }
880}
881
882impl Default for KnxCommand {
883 fn default() -> Self {
884 Self::new()
885 }
886}
887
888#[async_trait]
889impl Command for KnxCommand {
890 fn name(&self) -> &str {
891 "knx"
892 }
893
894 fn description(&self) -> &str {
895 "Start a KNXnet/IP simulator"
896 }
897
898 fn requires_engine(&self) -> bool {
899 true
900 }
901
902 fn supports_shutdown(&self) -> bool {
903 true
904 }
905
906 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
907 let format = ctx.output().format();
908 let is_quiet = ctx.is_quiet();
909 let is_verbose = ctx.is_verbose();
910 let is_debug = ctx.is_debug();
911
912 if !is_quiet {
913 if matches!(format, OutputFormat::Table) {
914 let output = ctx.output();
915 output.header("KNXnet/IP Simulator");
916 output.kv("Bind Address", self.bind_addr);
917 output.kv("Individual Address", &self.individual_address);
918 output.kv("Group Objects", self.group_objects);
919 }
920 }
921
922 if is_verbose {
924 ctx.vprintln(format!(" Max Connections: 10"));
925 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
926 }
927
928 if is_debug {
930 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
931 ctx.dprintln(format!("Individual address: {}", self.individual_address));
932 ctx.dprintln(format!("Group objects: {}", self.group_objects));
933 }
934
935 self.start_server(ctx).await?;
936
937 if !is_quiet {
938 match format {
939 OutputFormat::Table => {
940 let colors_enabled = ctx.colors_enabled();
941 let table = TableBuilder::new(colors_enabled)
942 .header(["Service", "Status"])
943 .status_row(["Core", "Ready"], StatusType::Success)
944 .status_row(["Device Management", "Ready"], StatusType::Success)
945 .status_row(["Tunneling", "Ready"], StatusType::Success);
946 table.print();
947 }
948 _ => {
949 #[derive(Serialize)]
950 struct KnxServerInfo {
951 protocol: String,
952 bind_address: String,
953 individual_address: String,
954 group_objects: usize,
955 services: Vec<ServiceInfo>,
956 status: String,
957 }
958 #[derive(Serialize)]
959 struct ServiceInfo {
960 service: String,
961 status: String,
962 }
963 let info = KnxServerInfo {
964 protocol: "KNXnet/IP".into(),
965 bind_address: self.bind_addr.to_string(),
966 individual_address: self.individual_address.clone(),
967 group_objects: self.group_objects,
968 services: vec![
969 ServiceInfo { service: "Core".into(), status: "Ready".into() },
970 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
971 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
972 ],
973 status: "Online".into(),
974 };
975 let _ = ctx.output().write(&info);
976 }
977 }
978 }
979
980 if !is_quiet {
981 ctx.output().info("Press Ctrl+C to stop");
982 }
983 ctx.shutdown_signal().notified().await;
984
985 self.stop_server(ctx).await?;
986 if !is_quiet {
987 ctx.output().success("KNX simulator stopped");
988 }
989
990 Ok(CommandOutput::quiet_success())
991 }
992}
993
994#[async_trait]
995impl ProtocolCommand for KnxCommand {
996 fn protocol(&self) -> Protocol {
997 Protocol::KnxIp
998 }
999
1000 fn default_port(&self) -> u16 {
1001 3671
1002 }
1003
1004 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1005 let output = ctx.output();
1006 let spinner = output.spinner("Starting KNX server...");
1007
1008 let individual_address: IndividualAddress = self.individual_address.parse()
1010 .map_err(|_| crate::error::CliError::ExecutionFailed {
1011 message: format!("Invalid individual address: {}", self.individual_address)
1012 })?;
1013
1014 let config = KnxServerConfig {
1015 bind_addr: self.bind_addr,
1016 individual_address,
1017 max_connections: 256,
1018 ..Default::default()
1019 };
1020
1021 let group_table = Arc::new(GroupObjectTable::new());
1023 let dpt_types = [
1024 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1033 let dpt_names = [
1034 "Switch", "Scaling", "Temperature", "Lux",
1035 "Humidity", "Counter", "SignedCounter", "Float",
1036 ];
1037
1038 for i in 0..self.group_objects {
1039 let main = ((i / 256) + 1) as u8;
1040 let middle = ((i / 8) % 8) as u8;
1041 let sub = (i % 256) as u8;
1042 let addr = GroupAddress::three_level(main, middle, sub);
1043 let dpt_idx = i % dpt_types.len();
1044 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1045 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1046 tracing::warn!("Failed to create group object {}: {}", i, e);
1047 }
1048 }
1049
1050 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1051
1052 {
1053 let mut server_guard = self.server.lock().await;
1054 *server_guard = Some(server.clone());
1055 }
1056
1057 let server_clone = server.clone();
1058 let task = tokio::spawn(async move {
1059 if let Err(e) = server_clone.start().await {
1060 tracing::error!("KNX server error: {}", e);
1061 }
1062 });
1063
1064 {
1065 let mut task_guard = self.server_task.lock().await;
1066 *task_guard = Some(task);
1067 }
1068
1069 tokio::time::sleep(Duration::from_millis(100)).await;
1070
1071 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1072 Ok(())
1073 }
1074
1075 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1076 let server_opt = self.server.lock().await.take();
1078 if let Some(server) = server_opt {
1079 let _ = tokio::task::spawn_blocking(move || {
1081 let rt = tokio::runtime::Handle::current();
1082 rt.block_on(async {
1083 let _ = server.stop().await;
1084 })
1085 }).await;
1086 }
1087
1088 if let Some(task) = self.server_task.lock().await.take() {
1089 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1090 }
1091
1092 Ok(())
1093 }
1094}