1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use serde::Serialize;
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::Mutex;
16use tokio::task::JoinHandle;
17
18use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
20use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
21use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
22use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress};
23
24#[async_trait]
26pub trait ProtocolCommand: Command {
27 fn protocol(&self) -> Protocol;
29
30 fn default_port(&self) -> u16;
32
33 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
35
36 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
38}
39
40pub struct ModbusCommand {
46 bind_addr: SocketAddr,
48 devices: usize,
50 points_per_device: usize,
52 rtu_mode: bool,
54 serial_port: Option<String>,
56 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
58 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
60}
61
62impl ModbusCommand {
63 pub fn new() -> Self {
64 Self {
65 bind_addr: "0.0.0.0:502".parse().unwrap(),
66 devices: 1,
67 points_per_device: 100,
68 rtu_mode: false,
69 serial_port: None,
70 server: Arc::new(Mutex::new(None)),
71 server_task: Arc::new(Mutex::new(None)),
72 }
73 }
74
75 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
76 self.bind_addr = addr;
77 self
78 }
79
80 pub fn with_port(mut self, port: u16) -> Self {
81 self.bind_addr.set_port(port);
82 self
83 }
84
85 pub fn with_devices(mut self, devices: usize) -> Self {
86 self.devices = devices;
87 self
88 }
89
90 pub fn with_points(mut self, points: usize) -> Self {
91 self.points_per_device = points;
92 self
93 }
94
95 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
96 self.rtu_mode = true;
97 self.serial_port = Some(serial_port.into());
98 self
99 }
100}
101
102impl Default for ModbusCommand {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108#[async_trait]
109impl Command for ModbusCommand {
110 fn name(&self) -> &str {
111 "modbus"
112 }
113
114 fn description(&self) -> &str {
115 "Start a Modbus TCP/RTU simulator"
116 }
117
118 fn requires_engine(&self) -> bool {
119 true
120 }
121
122 fn supports_shutdown(&self) -> bool {
123 true
124 }
125
126 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
127 let format = ctx.output().format();
128 let is_quiet = ctx.is_quiet();
129 let is_verbose = ctx.is_verbose();
130 let is_debug = ctx.is_debug();
131
132 if !is_quiet {
133 if matches!(format, OutputFormat::Table) {
134 let output = ctx.output();
135 if self.rtu_mode {
136 output.header("Modbus RTU Simulator");
137 output.kv(
138 "Serial Port",
139 self.serial_port.as_deref().unwrap_or("N/A"),
140 );
141 } else {
142 output.header("Modbus TCP Simulator");
143 output.kv("Bind Address", self.bind_addr);
144 }
145 output.kv("Devices", self.devices);
146 output.kv("Points per Device", self.points_per_device);
147 output.kv("Total Points", self.devices * self.points_per_device);
148 }
149 }
150
151 if is_verbose {
153 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
154 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
155 }
156
157 if is_debug {
159 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
160 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
161 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
162 }
163
164 self.start_server(ctx).await?;
165
166 let points_per_type = self.points_per_device / 4;
167
168 if !is_quiet {
169 match format {
170 OutputFormat::Table => {
171 let colors_enabled = ctx.colors_enabled();
172 let builder = TableBuilder::new(colors_enabled)
173 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
174
175 let devices = self.devices;
176 let pts = points_per_type.to_string();
177 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
178 let unit_id = (i + 1).to_string();
179 (
180 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
181 StatusType::Success,
182 )
183 });
184 table.print();
185 }
186 _ => {
187 #[derive(Serialize)]
188 struct ModbusServerInfo {
189 protocol: String,
190 bind_address: String,
191 devices: usize,
192 points_per_device: usize,
193 total_points: usize,
194 rtu_mode: bool,
195 serial_port: Option<String>,
196 device_list: Vec<ModbusDeviceInfo>,
197 status: String,
198 }
199 #[derive(Serialize)]
200 struct ModbusDeviceInfo {
201 unit_id: usize,
202 holding_registers: usize,
203 input_registers: usize,
204 coils: usize,
205 discrete_inputs: usize,
206 status: String,
207 }
208 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
209 .map(|i| ModbusDeviceInfo {
210 unit_id: i + 1,
211 holding_registers: points_per_type,
212 input_registers: points_per_type,
213 coils: points_per_type,
214 discrete_inputs: points_per_type,
215 status: "Online".into(),
216 })
217 .collect();
218 let info = ModbusServerInfo {
219 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
220 bind_address: self.bind_addr.to_string(),
221 devices: self.devices,
222 points_per_device: self.points_per_device,
223 total_points: self.devices * self.points_per_device,
224 rtu_mode: self.rtu_mode,
225 serial_port: self.serial_port.clone(),
226 device_list,
227 status: "Online".into(),
228 };
229 let _ = ctx.output().write(&info);
230 }
231 }
232 }
233
234 if !is_quiet {
235 ctx.output().info("Press Ctrl+C to stop");
236 }
237 ctx.shutdown_signal().notified().await;
238
239 self.stop_server(ctx).await?;
240 if !is_quiet {
241 ctx.output().success("Modbus simulator stopped");
242 }
243
244 Ok(CommandOutput::quiet_success())
245 }
246}
247
248#[async_trait]
249impl ProtocolCommand for ModbusCommand {
250 fn protocol(&self) -> Protocol {
251 if self.rtu_mode {
252 Protocol::ModbusRtu
253 } else {
254 Protocol::ModbusTcp
255 }
256 }
257
258 fn default_port(&self) -> u16 {
259 502
260 }
261
262 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
263 let output = ctx.output();
264 let spinner = output.spinner("Starting Modbus server...");
265
266 let config = ServerConfigV2 {
267 bind_address: self.bind_addr,
268 ..Default::default()
269 };
270
271 let server = Arc::new(ModbusTcpServerV2::new(config));
272
273 for i in 0..self.devices {
274 let unit_id = (i + 1) as u8;
275 let points = (self.points_per_device / 4) as u16;
276 let device_config = ModbusDeviceConfig {
277 unit_id,
278 name: format!("Device-{}", unit_id),
279 holding_registers: points,
280 input_registers: points,
281 coils: points,
282 discrete_inputs: points,
283 response_delay_ms: 0,
284 };
285 let device = ModbusDevice::new(device_config);
286 server.add_device(device);
287 }
288
289 {
290 let mut server_guard = self.server.lock().await;
291 *server_guard = Some(server.clone());
292 }
293
294 let server_clone = server.clone();
295 let task = tokio::spawn(async move {
296 if let Err(e) = server_clone.run().await {
297 tracing::error!("Modbus server error: {}", e);
298 }
299 });
300
301 {
302 let mut task_guard = self.server_task.lock().await;
303 *task_guard = Some(task);
304 }
305
306 tokio::time::sleep(Duration::from_millis(100)).await;
307
308 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
309 Ok(())
310 }
311
312 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
313 if let Some(server) = self.server.lock().await.as_ref() {
314 server.shutdown();
315 }
316
317 if let Some(task) = self.server_task.lock().await.take() {
318 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
319 }
320
321 Ok(())
322 }
323}
324
325pub struct OpcuaCommand {
331 bind_addr: SocketAddr,
332 endpoint_path: String,
333 nodes: usize,
334 security_mode: String,
335 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
337 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
339}
340
341impl OpcuaCommand {
342 pub fn new() -> Self {
343 Self {
344 bind_addr: "0.0.0.0:4840".parse().unwrap(),
345 endpoint_path: "/".into(),
346 nodes: 1000,
347 security_mode: "None".into(),
348 server: Arc::new(Mutex::new(None)),
349 server_task: Arc::new(Mutex::new(None)),
350 }
351 }
352
353 pub fn with_port(mut self, port: u16) -> Self {
354 self.bind_addr.set_port(port);
355 self
356 }
357
358 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
359 self.endpoint_path = path.into();
360 self
361 }
362
363 pub fn with_nodes(mut self, nodes: usize) -> Self {
364 self.nodes = nodes;
365 self
366 }
367
368 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
369 self.security_mode = mode.into();
370 self
371 }
372}
373
374impl Default for OpcuaCommand {
375 fn default() -> Self {
376 Self::new()
377 }
378}
379
380#[async_trait]
381impl Command for OpcuaCommand {
382 fn name(&self) -> &str {
383 "opcua"
384 }
385
386 fn description(&self) -> &str {
387 "Start an OPC UA server simulator"
388 }
389
390 fn requires_engine(&self) -> bool {
391 true
392 }
393
394 fn supports_shutdown(&self) -> bool {
395 true
396 }
397
398 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
399 let format = ctx.output().format();
400 let is_quiet = ctx.is_quiet();
401 let is_verbose = ctx.is_verbose();
402 let is_debug = ctx.is_debug();
403
404 if !is_quiet {
405 if matches!(format, OutputFormat::Table) {
406 let output = ctx.output();
407 output.header("OPC UA Simulator");
408 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
409 output.kv("Nodes", self.nodes);
410 output.kv("Security Mode", &self.security_mode);
411 }
412 }
413
414 if is_verbose {
416 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
417 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
418 ctx.vprintln(format!(" Max Subscriptions: 1000"));
419 ctx.vprintln(format!(" Max Monitored Items: 10000"));
420 }
421
422 if is_debug {
424 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
425 ctx.dprintln(format!("Node count: {}", self.nodes));
426 ctx.dprintln(format!("Security mode: {}", self.security_mode));
427 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
428 }
429
430 self.start_server(ctx).await?;
431
432 if !is_quiet {
433 match format {
434 OutputFormat::Table => {
435 let colors_enabled = ctx.colors_enabled();
436 let table = TableBuilder::new(colors_enabled)
437 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
438 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
439 .status_row(
440 ["1", &self.nodes.to_string(), "0", "Online"],
441 StatusType::Success,
442 );
443 table.print();
444 }
445 _ => {
446 #[derive(Serialize)]
447 struct OpcuaServerInfo {
448 protocol: String,
449 endpoint: String,
450 nodes: usize,
451 security_mode: String,
452 namespaces: Vec<NamespaceInfo>,
453 status: String,
454 }
455 #[derive(Serialize)]
456 struct NamespaceInfo {
457 index: u32,
458 nodes: String,
459 subscriptions: u32,
460 status: String,
461 }
462 let info = OpcuaServerInfo {
463 protocol: "OPC UA".into(),
464 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
465 nodes: self.nodes,
466 security_mode: self.security_mode.clone(),
467 namespaces: vec![
468 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
469 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
470 ],
471 status: "Online".into(),
472 };
473 let _ = ctx.output().write(&info);
474 }
475 }
476 }
477
478 if !is_quiet {
479 ctx.output().info("Press Ctrl+C to stop");
480 }
481 ctx.shutdown_signal().notified().await;
482
483 self.stop_server(ctx).await?;
484 if !is_quiet {
485 ctx.output().success("OPC UA simulator stopped");
486 }
487
488 Ok(CommandOutput::quiet_success())
489 }
490}
491
492#[async_trait]
493impl ProtocolCommand for OpcuaCommand {
494 fn protocol(&self) -> Protocol {
495 Protocol::OpcUa
496 }
497
498 fn default_port(&self) -> u16 {
499 4840
500 }
501
502 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
503 let output = ctx.output();
504 let spinner = output.spinner("Starting OPC UA server...");
505
506 let config = OpcUaServerConfig {
507 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
508 server_name: "Mabinogion OPC UA Simulator".to_string(),
509 max_subscriptions: 1000,
510 max_monitored_items: 10000,
511 ..Default::default()
512 };
513
514 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
515 crate::error::CliError::ExecutionFailed {
516 message: format!("Failed to create OPC UA server: {}", e)
517 }
518 })?);
519
520 for i in 0..self.nodes.min(100) {
522 let node_id = format!("ns=2;i={}", 1000 + i);
523 let _ = server.add_variable(node_id, format!("Variable_{}", i), (i as f64) * 0.1);
524 }
525
526 {
527 let mut server_guard = self.server.lock().await;
528 *server_guard = Some(server.clone());
529 }
530
531 let server_clone = server.clone();
532 let task = tokio::spawn(async move {
533 if let Err(e) = server_clone.start().await {
534 tracing::error!("OPC UA server error: {}", e);
535 }
536 });
537
538 {
539 let mut task_guard = self.server_task.lock().await;
540 *task_guard = Some(task);
541 }
542
543 tokio::time::sleep(Duration::from_millis(100)).await;
544
545 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
546 Ok(())
547 }
548
549 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
550 if let Some(server) = self.server.lock().await.as_ref() {
551 let _ = server.stop().await;
552 }
553
554 if let Some(task) = self.server_task.lock().await.take() {
555 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
556 }
557
558 Ok(())
559 }
560}
561
562pub struct BacnetCommand {
568 bind_addr: SocketAddr,
569 device_instance: u32,
570 objects: usize,
571 bbmd_enabled: bool,
572 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
574 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
576}
577
578impl BacnetCommand {
579 pub fn new() -> Self {
580 Self {
581 bind_addr: "0.0.0.0:47808".parse().unwrap(),
582 device_instance: 1234,
583 objects: 100,
584 bbmd_enabled: false,
585 server: Arc::new(Mutex::new(None)),
586 server_task: Arc::new(Mutex::new(None)),
587 }
588 }
589
590 pub fn with_port(mut self, port: u16) -> Self {
591 self.bind_addr.set_port(port);
592 self
593 }
594
595 pub fn with_device_instance(mut self, instance: u32) -> Self {
596 self.device_instance = instance;
597 self
598 }
599
600 pub fn with_objects(mut self, objects: usize) -> Self {
601 self.objects = objects;
602 self
603 }
604
605 pub fn with_bbmd(mut self, enabled: bool) -> Self {
606 self.bbmd_enabled = enabled;
607 self
608 }
609}
610
611impl Default for BacnetCommand {
612 fn default() -> Self {
613 Self::new()
614 }
615}
616
617#[async_trait]
618impl Command for BacnetCommand {
619 fn name(&self) -> &str {
620 "bacnet"
621 }
622
623 fn description(&self) -> &str {
624 "Start a BACnet/IP simulator"
625 }
626
627 fn requires_engine(&self) -> bool {
628 true
629 }
630
631 fn supports_shutdown(&self) -> bool {
632 true
633 }
634
635 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
636 let format = ctx.output().format();
637 let is_quiet = ctx.is_quiet();
638 let is_verbose = ctx.is_verbose();
639 let is_debug = ctx.is_debug();
640
641 if !is_quiet {
642 if matches!(format, OutputFormat::Table) {
643 let output = ctx.output();
644 output.header("BACnet/IP Simulator");
645 output.kv("Bind Address", self.bind_addr);
646 output.kv("Device Instance", self.device_instance);
647 output.kv("Objects", self.objects);
648 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
649 }
650 }
651
652 if is_verbose {
654 let per_type = self.objects / 4;
655 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
656 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
657 }
658
659 if is_debug {
661 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
662 ctx.dprintln(format!("Device instance: {}", self.device_instance));
663 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
664 }
665
666 self.start_server(ctx).await?;
667
668 let per_type = self.objects / 4;
669
670 if !is_quiet {
671 match format {
672 OutputFormat::Table => {
673 let colors_enabled = ctx.colors_enabled();
674 let table = TableBuilder::new(colors_enabled)
675 .header(["Object Type", "Count", "Status"])
676 .status_row(["Device", "1", "Online"], StatusType::Success)
677 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
678 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
679 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
680 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
681 table.print();
682 }
683 _ => {
684 #[derive(Serialize)]
685 struct BacnetServerInfo {
686 protocol: String,
687 bind_address: String,
688 device_instance: u32,
689 objects: usize,
690 bbmd_enabled: bool,
691 object_types: Vec<ObjectTypeInfo>,
692 status: String,
693 }
694 #[derive(Serialize)]
695 struct ObjectTypeInfo {
696 object_type: String,
697 count: usize,
698 status: String,
699 }
700 let info = BacnetServerInfo {
701 protocol: "BACnet/IP".into(),
702 bind_address: self.bind_addr.to_string(),
703 device_instance: self.device_instance,
704 objects: self.objects,
705 bbmd_enabled: self.bbmd_enabled,
706 object_types: vec![
707 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
708 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
709 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
710 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
711 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
712 ],
713 status: "Online".into(),
714 };
715 let _ = ctx.output().write(&info);
716 }
717 }
718 }
719
720 if !is_quiet {
721 ctx.output().info("Press Ctrl+C to stop");
722 }
723 ctx.shutdown_signal().notified().await;
724
725 self.stop_server(ctx).await?;
726 if !is_quiet {
727 ctx.output().success("BACnet simulator stopped");
728 }
729
730 Ok(CommandOutput::quiet_success())
731 }
732}
733
734#[async_trait]
735impl ProtocolCommand for BacnetCommand {
736 fn protocol(&self) -> Protocol {
737 Protocol::BacnetIp
738 }
739
740 fn default_port(&self) -> u16 {
741 47808
742 }
743
744 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
745 let output = ctx.output();
746 let spinner = output.spinner("Starting BACnet server...");
747
748 let config = BacnetServerConfig::new(self.device_instance)
749 .with_bind_addr(self.bind_addr)
750 .with_device_name("Mabinogion BACnet Simulator");
751
752 let registry = ObjectRegistry::new();
754
755 let objects_per_type = self.objects / 4;
756 for i in 0..objects_per_type {
757 let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
758 registry.register(Arc::new(ai));
759 }
760 for i in 0..objects_per_type {
761 let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
762 registry.register(Arc::new(ao));
763 }
764 for i in 0..objects_per_type {
765 let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
766 registry.register(Arc::new(bi));
767 }
768 for i in 0..objects_per_type {
769 let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
770 registry.register(Arc::new(bo));
771 }
772
773 let server = Arc::new(BACnetServer::new(config, registry));
774
775 {
776 let mut server_guard = self.server.lock().await;
777 *server_guard = Some(server.clone());
778 }
779
780 let server_clone = server.clone();
781 let task = tokio::spawn(async move {
782 if let Err(e) = server_clone.run().await {
783 tracing::error!("BACnet server error: {}", e);
784 }
785 });
786
787 {
788 let mut task_guard = self.server_task.lock().await;
789 *task_guard = Some(task);
790 }
791
792 tokio::time::sleep(Duration::from_millis(100)).await;
793
794 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
795 Ok(())
796 }
797
798 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
799 if let Some(server) = self.server.lock().await.as_ref() {
800 server.shutdown();
801 }
802
803 if let Some(task) = self.server_task.lock().await.take() {
804 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
805 }
806
807 Ok(())
808 }
809}
810
811pub struct KnxCommand {
817 bind_addr: SocketAddr,
818 individual_address: String,
819 group_objects: usize,
820 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
822 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
824}
825
826impl KnxCommand {
827 pub fn new() -> Self {
828 Self {
829 bind_addr: "0.0.0.0:3671".parse().unwrap(),
830 individual_address: "1.1.1".into(),
831 group_objects: 100,
832 server: Arc::new(Mutex::new(None)),
833 server_task: Arc::new(Mutex::new(None)),
834 }
835 }
836
837 pub fn with_port(mut self, port: u16) -> Self {
838 self.bind_addr.set_port(port);
839 self
840 }
841
842 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
843 self.individual_address = addr.into();
844 self
845 }
846
847 pub fn with_group_objects(mut self, count: usize) -> Self {
848 self.group_objects = count;
849 self
850 }
851}
852
853impl Default for KnxCommand {
854 fn default() -> Self {
855 Self::new()
856 }
857}
858
859#[async_trait]
860impl Command for KnxCommand {
861 fn name(&self) -> &str {
862 "knx"
863 }
864
865 fn description(&self) -> &str {
866 "Start a KNXnet/IP simulator"
867 }
868
869 fn requires_engine(&self) -> bool {
870 true
871 }
872
873 fn supports_shutdown(&self) -> bool {
874 true
875 }
876
877 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
878 let format = ctx.output().format();
879 let is_quiet = ctx.is_quiet();
880 let is_verbose = ctx.is_verbose();
881 let is_debug = ctx.is_debug();
882
883 if !is_quiet {
884 if matches!(format, OutputFormat::Table) {
885 let output = ctx.output();
886 output.header("KNXnet/IP Simulator");
887 output.kv("Bind Address", self.bind_addr);
888 output.kv("Individual Address", &self.individual_address);
889 output.kv("Group Objects", self.group_objects);
890 }
891 }
892
893 if is_verbose {
895 ctx.vprintln(format!(" Max Connections: 10"));
896 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
897 }
898
899 if is_debug {
901 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
902 ctx.dprintln(format!("Individual address: {}", self.individual_address));
903 ctx.dprintln(format!("Group objects: {}", self.group_objects));
904 }
905
906 self.start_server(ctx).await?;
907
908 if !is_quiet {
909 match format {
910 OutputFormat::Table => {
911 let colors_enabled = ctx.colors_enabled();
912 let table = TableBuilder::new(colors_enabled)
913 .header(["Service", "Status"])
914 .status_row(["Core", "Ready"], StatusType::Success)
915 .status_row(["Device Management", "Ready"], StatusType::Success)
916 .status_row(["Tunneling", "Ready"], StatusType::Success);
917 table.print();
918 }
919 _ => {
920 #[derive(Serialize)]
921 struct KnxServerInfo {
922 protocol: String,
923 bind_address: String,
924 individual_address: String,
925 group_objects: usize,
926 services: Vec<ServiceInfo>,
927 status: String,
928 }
929 #[derive(Serialize)]
930 struct ServiceInfo {
931 service: String,
932 status: String,
933 }
934 let info = KnxServerInfo {
935 protocol: "KNXnet/IP".into(),
936 bind_address: self.bind_addr.to_string(),
937 individual_address: self.individual_address.clone(),
938 group_objects: self.group_objects,
939 services: vec![
940 ServiceInfo { service: "Core".into(), status: "Ready".into() },
941 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
942 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
943 ],
944 status: "Online".into(),
945 };
946 let _ = ctx.output().write(&info);
947 }
948 }
949 }
950
951 if !is_quiet {
952 ctx.output().info("Press Ctrl+C to stop");
953 }
954 ctx.shutdown_signal().notified().await;
955
956 self.stop_server(ctx).await?;
957 if !is_quiet {
958 ctx.output().success("KNX simulator stopped");
959 }
960
961 Ok(CommandOutput::quiet_success())
962 }
963}
964
965#[async_trait]
966impl ProtocolCommand for KnxCommand {
967 fn protocol(&self) -> Protocol {
968 Protocol::KnxIp
969 }
970
971 fn default_port(&self) -> u16 {
972 3671
973 }
974
975 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
976 let output = ctx.output();
977 let spinner = output.spinner("Starting KNX server...");
978
979 let individual_address: IndividualAddress = self.individual_address.parse()
981 .map_err(|_| crate::error::CliError::ExecutionFailed {
982 message: format!("Invalid individual address: {}", self.individual_address)
983 })?;
984
985 let config = KnxServerConfig {
986 bind_addr: self.bind_addr,
987 individual_address,
988 max_connections: 10,
989 ..Default::default()
990 };
991
992 let server = Arc::new(KnxServer::new(config));
993
994 {
995 let mut server_guard = self.server.lock().await;
996 *server_guard = Some(server.clone());
997 }
998
999 let server_clone = server.clone();
1000 let task = tokio::spawn(async move {
1001 if let Err(e) = server_clone.start().await {
1002 tracing::error!("KNX server error: {}", e);
1003 }
1004 });
1005
1006 {
1007 let mut task_guard = self.server_task.lock().await;
1008 *task_guard = Some(task);
1009 }
1010
1011 tokio::time::sleep(Duration::from_millis(100)).await;
1012
1013 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1014 Ok(())
1015 }
1016
1017 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1018 let server_opt = self.server.lock().await.take();
1020 if let Some(server) = server_opt {
1021 let _ = tokio::task::spawn_blocking(move || {
1023 let rt = tokio::runtime::Handle::current();
1024 rt.block_on(async {
1025 let _ = server.stop().await;
1026 })
1027 }).await;
1028 }
1029
1030 if let Some(task) = self.server_task.lock().await.take() {
1031 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1032 }
1033
1034 Ok(())
1035 }
1036}