1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use serde::Serialize;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
20use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
21use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
22use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
23
24#[async_trait]
26pub trait ProtocolCommand: Command {
27 fn protocol(&self) -> Protocol;
29
30 fn default_port(&self) -> u16;
32
33 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
35
36 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
38}
39
40pub struct ModbusCommand {
46 bind_addr: SocketAddr,
48 devices: usize,
50 points_per_device: usize,
52 rtu_mode: bool,
54 serial_port: Option<String>,
56 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
58 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
60}
61
62impl ModbusCommand {
63 pub fn new() -> Self {
64 Self {
65 bind_addr: "0.0.0.0:502".parse().unwrap(),
66 devices: 1,
67 points_per_device: 100,
68 rtu_mode: false,
69 serial_port: None,
70 server: Arc::new(Mutex::new(None)),
71 server_task: Arc::new(Mutex::new(None)),
72 }
73 }
74
75 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
76 self.bind_addr = addr;
77 self
78 }
79
80 pub fn with_port(mut self, port: u16) -> Self {
81 self.bind_addr.set_port(port);
82 self
83 }
84
85 pub fn with_devices(mut self, devices: usize) -> Self {
86 self.devices = devices;
87 self
88 }
89
90 pub fn with_points(mut self, points: usize) -> Self {
91 self.points_per_device = points;
92 self
93 }
94
95 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
96 self.rtu_mode = true;
97 self.serial_port = Some(serial_port.into());
98 self
99 }
100}
101
102impl Default for ModbusCommand {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[async_trait]
109impl Command for ModbusCommand {
110 fn name(&self) -> &str {
111 "modbus"
112 }
113
114 fn description(&self) -> &str {
115 "Start a Modbus TCP/RTU simulator"
116 }
117
118 fn requires_engine(&self) -> bool {
119 true
120 }
121
122 fn supports_shutdown(&self) -> bool {
123 true
124 }
125
126 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
127 let format = ctx.output().format();
128 let is_quiet = ctx.is_quiet();
129 let is_verbose = ctx.is_verbose();
130 let is_debug = ctx.is_debug();
131
132 if !is_quiet {
133 if matches!(format, OutputFormat::Table) {
134 let output = ctx.output();
135 if self.rtu_mode {
136 output.header("Modbus RTU Simulator");
137 output.kv(
138 "Serial Port",
139 self.serial_port.as_deref().unwrap_or("N/A"),
140 );
141 } else {
142 output.header("Modbus TCP Simulator");
143 output.kv("Bind Address", self.bind_addr);
144 }
145 output.kv("Devices", self.devices);
146 output.kv("Points per Device", self.points_per_device);
147 output.kv("Total Points", self.devices * self.points_per_device);
148 }
149 }
150
151 if is_verbose {
153 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
154 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
155 }
156
157 if is_debug {
159 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
160 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
161 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
162 }
163
164 self.start_server(ctx).await?;
165
166 let points_per_type = self.points_per_device / 4;
167
168 if !is_quiet {
169 match format {
170 OutputFormat::Table => {
171 let colors_enabled = ctx.colors_enabled();
172 let builder = TableBuilder::new(colors_enabled)
173 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
174
175 let devices = self.devices;
176 let pts = points_per_type.to_string();
177 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
178 let unit_id = (i + 1).to_string();
179 (
180 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
181 StatusType::Success,
182 )
183 });
184 table.print();
185 }
186 _ => {
187 #[derive(Serialize)]
188 struct ModbusServerInfo {
189 protocol: String,
190 bind_address: String,
191 devices: usize,
192 points_per_device: usize,
193 total_points: usize,
194 rtu_mode: bool,
195 serial_port: Option<String>,
196 device_list: Vec<ModbusDeviceInfo>,
197 status: String,
198 }
199 #[derive(Serialize)]
200 struct ModbusDeviceInfo {
201 unit_id: usize,
202 holding_registers: usize,
203 input_registers: usize,
204 coils: usize,
205 discrete_inputs: usize,
206 status: String,
207 }
208 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
209 .map(|i| ModbusDeviceInfo {
210 unit_id: i + 1,
211 holding_registers: points_per_type,
212 input_registers: points_per_type,
213 coils: points_per_type,
214 discrete_inputs: points_per_type,
215 status: "Online".into(),
216 })
217 .collect();
218 let info = ModbusServerInfo {
219 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
220 bind_address: self.bind_addr.to_string(),
221 devices: self.devices,
222 points_per_device: self.points_per_device,
223 total_points: self.devices * self.points_per_device,
224 rtu_mode: self.rtu_mode,
225 serial_port: self.serial_port.clone(),
226 device_list,
227 status: "Online".into(),
228 };
229 let _ = ctx.output().write(&info);
230 }
231 }
232 }
233
234 if !is_quiet {
235 ctx.output().info("Press Ctrl+C to stop");
236 }
237 ctx.shutdown_signal().notified().await;
238
239 self.stop_server(ctx).await?;
240 if !is_quiet {
241 ctx.output().success("Modbus simulator stopped");
242 }
243
244 Ok(CommandOutput::quiet_success())
245 }
246}
247
248#[async_trait]
249impl ProtocolCommand for ModbusCommand {
250 fn protocol(&self) -> Protocol {
251 if self.rtu_mode {
252 Protocol::ModbusRtu
253 } else {
254 Protocol::ModbusTcp
255 }
256 }
257
258 fn default_port(&self) -> u16 {
259 502
260 }
261
262 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
263 let output = ctx.output();
264 let spinner = output.spinner("Starting Modbus server...");
265
266 let config = ServerConfigV2 {
267 bind_address: self.bind_addr,
268 ..Default::default()
269 };
270
271 let server = Arc::new(ModbusTcpServerV2::new(config));
272
273 for i in 0..self.devices {
274 let unit_id = (i + 1) as u8;
275 let points = (self.points_per_device / 4) as u16;
276 let device_config = ModbusDeviceConfig {
277 unit_id,
278 name: format!("Device-{}", unit_id),
279 holding_registers: points,
280 input_registers: points,
281 coils: points,
282 discrete_inputs: points,
283 response_delay_ms: 0,
284 };
285 let device = ModbusDevice::new(device_config);
286 server.add_device(device);
287 }
288
289 {
290 let mut server_guard = self.server.lock().await;
291 *server_guard = Some(server.clone());
292 }
293
294 let server_clone = server.clone();
295 let task = tokio::spawn(async move {
296 if let Err(e) = server_clone.run().await {
297 tracing::error!("Modbus server error: {}", e);
298 }
299 });
300
301 {
302 let mut task_guard = self.server_task.lock().await;
303 *task_guard = Some(task);
304 }
305
306 tokio::time::sleep(Duration::from_millis(100)).await;
307
308 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
309 Ok(())
310 }
311
312 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
313 if let Some(server) = self.server.lock().await.as_ref() {
314 server.shutdown();
315 }
316
317 if let Some(task) = self.server_task.lock().await.take() {
318 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
319 }
320
321 Ok(())
322 }
323}
324
325pub struct OpcuaCommand {
331 bind_addr: SocketAddr,
332 endpoint_path: String,
333 nodes: usize,
334 security_mode: String,
335 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
337 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
339}
340
341impl OpcuaCommand {
342 pub fn new() -> Self {
343 Self {
344 bind_addr: "0.0.0.0:4840".parse().unwrap(),
345 endpoint_path: "/".into(),
346 nodes: 1000,
347 security_mode: "None".into(),
348 server: Arc::new(Mutex::new(None)),
349 server_task: Arc::new(Mutex::new(None)),
350 }
351 }
352
353 pub fn with_port(mut self, port: u16) -> Self {
354 self.bind_addr.set_port(port);
355 self
356 }
357
358 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
359 self.endpoint_path = path.into();
360 self
361 }
362
363 pub fn with_nodes(mut self, nodes: usize) -> Self {
364 self.nodes = nodes;
365 self
366 }
367
368 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
369 self.security_mode = mode.into();
370 self
371 }
372}
373
374impl Default for OpcuaCommand {
375 fn default() -> Self {
376 Self::new()
377 }
378}
379
380#[async_trait]
381impl Command for OpcuaCommand {
382 fn name(&self) -> &str {
383 "opcua"
384 }
385
386 fn description(&self) -> &str {
387 "Start an OPC UA server simulator"
388 }
389
390 fn requires_engine(&self) -> bool {
391 true
392 }
393
394 fn supports_shutdown(&self) -> bool {
395 true
396 }
397
398 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
399 let format = ctx.output().format();
400 let is_quiet = ctx.is_quiet();
401 let is_verbose = ctx.is_verbose();
402 let is_debug = ctx.is_debug();
403
404 if !is_quiet {
405 if matches!(format, OutputFormat::Table) {
406 let output = ctx.output();
407 output.header("OPC UA Simulator");
408 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
409 output.kv("Nodes", self.nodes);
410 output.kv("Security Mode", &self.security_mode);
411 }
412 }
413
414 if is_verbose {
416 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
417 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
418 ctx.vprintln(format!(" Max Subscriptions: 1000"));
419 ctx.vprintln(format!(" Max Monitored Items: 10000"));
420 }
421
422 if is_debug {
424 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
425 ctx.dprintln(format!("Node count: {}", self.nodes));
426 ctx.dprintln(format!("Security mode: {}", self.security_mode));
427 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
428 }
429
430 self.start_server(ctx).await?;
431
432 if !is_quiet {
433 match format {
434 OutputFormat::Table => {
435 let colors_enabled = ctx.colors_enabled();
436 let table = TableBuilder::new(colors_enabled)
437 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
438 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
439 .status_row(
440 ["1", &self.nodes.to_string(), "0", "Online"],
441 StatusType::Success,
442 );
443 table.print();
444 }
445 _ => {
446 #[derive(Serialize)]
447 struct OpcuaServerInfo {
448 protocol: String,
449 endpoint: String,
450 nodes: usize,
451 security_mode: String,
452 namespaces: Vec<NamespaceInfo>,
453 status: String,
454 }
455 #[derive(Serialize)]
456 struct NamespaceInfo {
457 index: u32,
458 nodes: String,
459 subscriptions: u32,
460 status: String,
461 }
462 let info = OpcuaServerInfo {
463 protocol: "OPC UA".into(),
464 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
465 nodes: self.nodes,
466 security_mode: self.security_mode.clone(),
467 namespaces: vec![
468 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
469 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
470 ],
471 status: "Online".into(),
472 };
473 let _ = ctx.output().write(&info);
474 }
475 }
476 }
477
478 if !is_quiet {
479 ctx.output().info("Press Ctrl+C to stop");
480 }
481 ctx.shutdown_signal().notified().await;
482
483 self.stop_server(ctx).await?;
484 if !is_quiet {
485 ctx.output().success("OPC UA simulator stopped");
486 }
487
488 Ok(CommandOutput::quiet_success())
489 }
490}
491
492#[async_trait]
493impl ProtocolCommand for OpcuaCommand {
494 fn protocol(&self) -> Protocol {
495 Protocol::OpcUa
496 }
497
498 fn default_port(&self) -> u16 {
499 4840
500 }
501
502 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
503 let output = ctx.output();
504 let spinner = output.spinner("Starting OPC UA server...");
505
506 let config = OpcUaServerConfig {
507 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
508 server_name: "Mabinogion OPC UA Simulator".to_string(),
509 max_subscriptions: 1000,
510 max_monitored_items: 10000,
511 ..Default::default()
512 };
513
514 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
515 crate::error::CliError::ExecutionFailed {
516 message: format!("Failed to create OPC UA server: {}", e)
517 }
518 })?);
519
520 let node_count = self.nodes.min(100);
523 for i in 0..node_count {
524 let node_id = format!("ns=2;i={}", 1000 + i);
525 let name = format!("Variable_{}", i);
526 let value = (i as f64) * 0.1;
527
528 if i % 2 == 0 {
529 let _ = server.add_writable_variable(node_id, name, value);
530 } else {
531 let _ = server.add_variable(node_id, name, value);
532 }
533 }
534
535 {
536 let mut server_guard = self.server.lock().await;
537 *server_guard = Some(server.clone());
538 }
539
540 let server_clone = server.clone();
541 let task = tokio::spawn(async move {
542 if let Err(e) = server_clone.start().await {
543 tracing::error!("OPC UA server error: {}", e);
544 }
545 });
546
547 {
548 let mut task_guard = self.server_task.lock().await;
549 *task_guard = Some(task);
550 }
551
552 tokio::time::sleep(Duration::from_millis(100)).await;
553
554 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
555 Ok(())
556 }
557
558 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
559 if let Some(server) = self.server.lock().await.as_ref() {
560 let _ = server.stop().await;
561 }
562
563 if let Some(task) = self.server_task.lock().await.take() {
564 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
565 }
566
567 Ok(())
568 }
569}
570
571pub struct BacnetCommand {
577 bind_addr: SocketAddr,
578 device_instance: u32,
579 objects: usize,
580 bbmd_enabled: bool,
581 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
583 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
585}
586
587impl BacnetCommand {
588 pub fn new() -> Self {
589 Self {
590 bind_addr: "0.0.0.0:47808".parse().unwrap(),
591 device_instance: 1234,
592 objects: 100,
593 bbmd_enabled: false,
594 server: Arc::new(Mutex::new(None)),
595 server_task: Arc::new(Mutex::new(None)),
596 }
597 }
598
599 pub fn with_port(mut self, port: u16) -> Self {
600 self.bind_addr.set_port(port);
601 self
602 }
603
604 pub fn with_device_instance(mut self, instance: u32) -> Self {
605 self.device_instance = instance;
606 self
607 }
608
609 pub fn with_objects(mut self, objects: usize) -> Self {
610 self.objects = objects;
611 self
612 }
613
614 pub fn with_bbmd(mut self, enabled: bool) -> Self {
615 self.bbmd_enabled = enabled;
616 self
617 }
618}
619
620impl Default for BacnetCommand {
621 fn default() -> Self {
622 Self::new()
623 }
624}
625
626#[async_trait]
627impl Command for BacnetCommand {
628 fn name(&self) -> &str {
629 "bacnet"
630 }
631
632 fn description(&self) -> &str {
633 "Start a BACnet/IP simulator"
634 }
635
636 fn requires_engine(&self) -> bool {
637 true
638 }
639
640 fn supports_shutdown(&self) -> bool {
641 true
642 }
643
644 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
645 let format = ctx.output().format();
646 let is_quiet = ctx.is_quiet();
647 let is_verbose = ctx.is_verbose();
648 let is_debug = ctx.is_debug();
649
650 if !is_quiet {
651 if matches!(format, OutputFormat::Table) {
652 let output = ctx.output();
653 output.header("BACnet/IP Simulator");
654 output.kv("Bind Address", self.bind_addr);
655 output.kv("Device Instance", self.device_instance);
656 output.kv("Objects", self.objects);
657 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
658 }
659 }
660
661 if is_verbose {
663 let per_type = self.objects / 4;
664 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
665 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
666 }
667
668 if is_debug {
670 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
671 ctx.dprintln(format!("Device instance: {}", self.device_instance));
672 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
673 }
674
675 self.start_server(ctx).await?;
676
677 let per_type = self.objects / 4;
678
679 if !is_quiet {
680 match format {
681 OutputFormat::Table => {
682 let colors_enabled = ctx.colors_enabled();
683 let table = TableBuilder::new(colors_enabled)
684 .header(["Object Type", "Count", "Status"])
685 .status_row(["Device", "1", "Online"], StatusType::Success)
686 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
687 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
688 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
689 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
690 table.print();
691 }
692 _ => {
693 #[derive(Serialize)]
694 struct BacnetServerInfo {
695 protocol: String,
696 bind_address: String,
697 device_instance: u32,
698 objects: usize,
699 bbmd_enabled: bool,
700 object_types: Vec<ObjectTypeInfo>,
701 status: String,
702 }
703 #[derive(Serialize)]
704 struct ObjectTypeInfo {
705 object_type: String,
706 count: usize,
707 status: String,
708 }
709 let info = BacnetServerInfo {
710 protocol: "BACnet/IP".into(),
711 bind_address: self.bind_addr.to_string(),
712 device_instance: self.device_instance,
713 objects: self.objects,
714 bbmd_enabled: self.bbmd_enabled,
715 object_types: vec![
716 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
717 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
718 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
719 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
720 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
721 ],
722 status: "Online".into(),
723 };
724 let _ = ctx.output().write(&info);
725 }
726 }
727 }
728
729 if !is_quiet {
730 ctx.output().info("Press Ctrl+C to stop");
731 }
732 ctx.shutdown_signal().notified().await;
733
734 self.stop_server(ctx).await?;
735 if !is_quiet {
736 ctx.output().success("BACnet simulator stopped");
737 }
738
739 Ok(CommandOutput::quiet_success())
740 }
741}
742
743#[async_trait]
744impl ProtocolCommand for BacnetCommand {
745 fn protocol(&self) -> Protocol {
746 Protocol::BacnetIp
747 }
748
749 fn default_port(&self) -> u16 {
750 47808
751 }
752
753 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
754 let output = ctx.output();
755 let spinner = output.spinner("Starting BACnet server...");
756
757 let config = BacnetServerConfig::new(self.device_instance)
758 .with_bind_addr(self.bind_addr)
759 .with_device_name("Mabinogion BACnet Simulator");
760
761 let registry = ObjectRegistry::new();
763
764 let objects_per_type = self.objects / 4;
765 for i in 0..objects_per_type {
766 let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
767 registry.register(Arc::new(ai));
768 }
769 for i in 0..objects_per_type {
770 let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
771 registry.register(Arc::new(ao));
772 }
773 for i in 0..objects_per_type {
774 let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
775 registry.register(Arc::new(bi));
776 }
777 for i in 0..objects_per_type {
778 let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
779 registry.register(Arc::new(bo));
780 }
781
782 let server = Arc::new(BACnetServer::new(config, registry));
783
784 {
785 let mut server_guard = self.server.lock().await;
786 *server_guard = Some(server.clone());
787 }
788
789 let server_clone = server.clone();
790 let task = tokio::spawn(async move {
791 if let Err(e) = server_clone.run().await {
792 tracing::error!("BACnet server error: {}", e);
793 }
794 });
795
796 {
797 let mut task_guard = self.server_task.lock().await;
798 *task_guard = Some(task);
799 }
800
801 tokio::time::sleep(Duration::from_millis(100)).await;
802
803 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
804 Ok(())
805 }
806
807 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
808 if let Some(server) = self.server.lock().await.as_ref() {
809 server.shutdown();
810 }
811
812 if let Some(task) = self.server_task.lock().await.take() {
813 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
814 }
815
816 Ok(())
817 }
818}
819
820pub struct KnxCommand {
826 bind_addr: SocketAddr,
827 individual_address: String,
828 group_objects: usize,
829 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
831 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
833}
834
835impl KnxCommand {
836 pub fn new() -> Self {
837 Self {
838 bind_addr: "0.0.0.0:3671".parse().unwrap(),
839 individual_address: "1.1.1".into(),
840 group_objects: 100,
841 server: Arc::new(Mutex::new(None)),
842 server_task: Arc::new(Mutex::new(None)),
843 }
844 }
845
846 pub fn with_port(mut self, port: u16) -> Self {
847 self.bind_addr.set_port(port);
848 self
849 }
850
851 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
852 self.individual_address = addr.into();
853 self
854 }
855
856 pub fn with_group_objects(mut self, count: usize) -> Self {
857 self.group_objects = count;
858 self
859 }
860}
861
862impl Default for KnxCommand {
863 fn default() -> Self {
864 Self::new()
865 }
866}
867
868#[async_trait]
869impl Command for KnxCommand {
870 fn name(&self) -> &str {
871 "knx"
872 }
873
874 fn description(&self) -> &str {
875 "Start a KNXnet/IP simulator"
876 }
877
878 fn requires_engine(&self) -> bool {
879 true
880 }
881
882 fn supports_shutdown(&self) -> bool {
883 true
884 }
885
886 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
887 let format = ctx.output().format();
888 let is_quiet = ctx.is_quiet();
889 let is_verbose = ctx.is_verbose();
890 let is_debug = ctx.is_debug();
891
892 if !is_quiet {
893 if matches!(format, OutputFormat::Table) {
894 let output = ctx.output();
895 output.header("KNXnet/IP Simulator");
896 output.kv("Bind Address", self.bind_addr);
897 output.kv("Individual Address", &self.individual_address);
898 output.kv("Group Objects", self.group_objects);
899 }
900 }
901
902 if is_verbose {
904 ctx.vprintln(format!(" Max Connections: 10"));
905 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
906 }
907
908 if is_debug {
910 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
911 ctx.dprintln(format!("Individual address: {}", self.individual_address));
912 ctx.dprintln(format!("Group objects: {}", self.group_objects));
913 }
914
915 self.start_server(ctx).await?;
916
917 if !is_quiet {
918 match format {
919 OutputFormat::Table => {
920 let colors_enabled = ctx.colors_enabled();
921 let table = TableBuilder::new(colors_enabled)
922 .header(["Service", "Status"])
923 .status_row(["Core", "Ready"], StatusType::Success)
924 .status_row(["Device Management", "Ready"], StatusType::Success)
925 .status_row(["Tunneling", "Ready"], StatusType::Success);
926 table.print();
927 }
928 _ => {
929 #[derive(Serialize)]
930 struct KnxServerInfo {
931 protocol: String,
932 bind_address: String,
933 individual_address: String,
934 group_objects: usize,
935 services: Vec<ServiceInfo>,
936 status: String,
937 }
938 #[derive(Serialize)]
939 struct ServiceInfo {
940 service: String,
941 status: String,
942 }
943 let info = KnxServerInfo {
944 protocol: "KNXnet/IP".into(),
945 bind_address: self.bind_addr.to_string(),
946 individual_address: self.individual_address.clone(),
947 group_objects: self.group_objects,
948 services: vec![
949 ServiceInfo { service: "Core".into(), status: "Ready".into() },
950 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
951 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
952 ],
953 status: "Online".into(),
954 };
955 let _ = ctx.output().write(&info);
956 }
957 }
958 }
959
960 if !is_quiet {
961 ctx.output().info("Press Ctrl+C to stop");
962 }
963 ctx.shutdown_signal().notified().await;
964
965 self.stop_server(ctx).await?;
966 if !is_quiet {
967 ctx.output().success("KNX simulator stopped");
968 }
969
970 Ok(CommandOutput::quiet_success())
971 }
972}
973
974#[async_trait]
975impl ProtocolCommand for KnxCommand {
976 fn protocol(&self) -> Protocol {
977 Protocol::KnxIp
978 }
979
980 fn default_port(&self) -> u16 {
981 3671
982 }
983
984 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
985 let output = ctx.output();
986 let spinner = output.spinner("Starting KNX server...");
987
988 let individual_address: IndividualAddress = self.individual_address.parse()
990 .map_err(|_| crate::error::CliError::ExecutionFailed {
991 message: format!("Invalid individual address: {}", self.individual_address)
992 })?;
993
994 let config = KnxServerConfig {
995 bind_addr: self.bind_addr,
996 individual_address,
997 max_connections: 256,
998 ..Default::default()
999 };
1000
1001 let group_table = Arc::new(GroupObjectTable::new());
1003 let dpt_types = [
1004 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1013 let dpt_names = [
1014 "Switch", "Scaling", "Temperature", "Lux",
1015 "Humidity", "Counter", "SignedCounter", "Float",
1016 ];
1017
1018 for i in 0..self.group_objects {
1019 let main = ((i / 256) + 1) as u8;
1020 let middle = ((i / 8) % 8) as u8;
1021 let sub = (i % 256) as u8;
1022 let addr = GroupAddress::three_level(main, middle, sub);
1023 let dpt_idx = i % dpt_types.len();
1024 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1025 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1026 tracing::warn!("Failed to create group object {}: {}", i, e);
1027 }
1028 }
1029
1030 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1031
1032 {
1033 let mut server_guard = self.server.lock().await;
1034 *server_guard = Some(server.clone());
1035 }
1036
1037 let server_clone = server.clone();
1038 let task = tokio::spawn(async move {
1039 if let Err(e) = server_clone.start().await {
1040 tracing::error!("KNX server error: {}", e);
1041 }
1042 });
1043
1044 {
1045 let mut task_guard = self.server_task.lock().await;
1046 *task_guard = Some(task);
1047 }
1048
1049 tokio::time::sleep(Duration::from_millis(100)).await;
1050
1051 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1052 Ok(())
1053 }
1054
1055 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1056 let server_opt = self.server.lock().await.take();
1058 if let Some(server) = server_opt {
1059 let _ = tokio::task::spawn_blocking(move || {
1061 let rt = tokio::runtime::Handle::current();
1062 rt.block_on(async {
1063 let _ = server.stop().await;
1064 })
1065 }).await;
1066 }
1067
1068 if let Some(task) = self.server_task.lock().await.take() {
1069 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1070 }
1071
1072 Ok(())
1073 }
1074}