1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25#[async_trait]
27pub trait ProtocolCommand: Command {
28 fn protocol(&self) -> Protocol;
30
31 fn default_port(&self) -> u16;
33
34 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41pub struct ModbusCommand {
47 bind_addr: SocketAddr,
49 devices: usize,
51 points_per_device: usize,
53 rtu_mode: bool,
55 serial_port: Option<String>,
57 tags: Tags,
59 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66 pub fn new() -> Self {
67 Self {
68 bind_addr: "0.0.0.0:502".parse().unwrap(),
69 devices: 1,
70 points_per_device: 100,
71 rtu_mode: false,
72 serial_port: None,
73 tags: Tags::new(),
74 server: Arc::new(Mutex::new(None)),
75 server_task: Arc::new(Mutex::new(None)),
76 }
77 }
78
79 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80 self.bind_addr = addr;
81 self
82 }
83
84 pub fn with_port(mut self, port: u16) -> Self {
85 self.bind_addr.set_port(port);
86 self
87 }
88
89 pub fn with_devices(mut self, devices: usize) -> Self {
90 self.devices = devices;
91 self
92 }
93
94 pub fn with_points(mut self, points: usize) -> Self {
95 self.points_per_device = points;
96 self
97 }
98
99 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100 self.rtu_mode = true;
101 self.serial_port = Some(serial_port.into());
102 self
103 }
104
105 pub fn with_tags(mut self, tags: Tags) -> Self {
106 self.tags = tags;
107 self
108 }
109}
110
111impl Default for ModbusCommand {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119 fn name(&self) -> &str {
120 "modbus"
121 }
122
123 fn description(&self) -> &str {
124 "Start a Modbus TCP/RTU simulator"
125 }
126
127 fn requires_engine(&self) -> bool {
128 true
129 }
130
131 fn supports_shutdown(&self) -> bool {
132 true
133 }
134
135 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136 let format = ctx.output().format();
137 let is_quiet = ctx.is_quiet();
138 let is_verbose = ctx.is_verbose();
139 let is_debug = ctx.is_debug();
140
141 if !is_quiet {
142 if matches!(format, OutputFormat::Table) {
143 let output = ctx.output();
144 if self.rtu_mode {
145 output.header("Modbus RTU Simulator");
146 output.kv(
147 "Serial Port",
148 self.serial_port.as_deref().unwrap_or("N/A"),
149 );
150 } else {
151 output.header("Modbus TCP Simulator");
152 output.kv("Bind Address", self.bind_addr);
153 }
154 output.kv("Devices", self.devices);
155 output.kv("Points per Device", self.points_per_device);
156 output.kv("Total Points", self.devices * self.points_per_device);
157 }
158 }
159
160 if is_verbose {
162 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
164 }
165
166 if is_debug {
168 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171 }
172
173 self.start_server(ctx).await?;
174
175 let points_per_type = self.points_per_device / 4;
176
177 if !is_quiet {
178 match format {
179 OutputFormat::Table => {
180 let colors_enabled = ctx.colors_enabled();
181 let builder = TableBuilder::new(colors_enabled)
182 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184 let devices = self.devices;
185 let pts = points_per_type.to_string();
186 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187 let unit_id = (i + 1).to_string();
188 (
189 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190 StatusType::Success,
191 )
192 });
193 table.print();
194 }
195 _ => {
196 #[derive(Serialize)]
197 struct ModbusServerInfo {
198 protocol: String,
199 bind_address: String,
200 devices: usize,
201 points_per_device: usize,
202 total_points: usize,
203 rtu_mode: bool,
204 serial_port: Option<String>,
205 device_list: Vec<ModbusDeviceInfo>,
206 status: String,
207 }
208 #[derive(Serialize)]
209 struct ModbusDeviceInfo {
210 unit_id: usize,
211 holding_registers: usize,
212 input_registers: usize,
213 coils: usize,
214 discrete_inputs: usize,
215 status: String,
216 }
217 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218 .map(|i| ModbusDeviceInfo {
219 unit_id: i + 1,
220 holding_registers: points_per_type,
221 input_registers: points_per_type,
222 coils: points_per_type,
223 discrete_inputs: points_per_type,
224 status: "Online".into(),
225 })
226 .collect();
227 let info = ModbusServerInfo {
228 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229 bind_address: self.bind_addr.to_string(),
230 devices: self.devices,
231 points_per_device: self.points_per_device,
232 total_points: self.devices * self.points_per_device,
233 rtu_mode: self.rtu_mode,
234 serial_port: self.serial_port.clone(),
235 device_list,
236 status: "Online".into(),
237 };
238 let _ = ctx.output().write(&info);
239 }
240 }
241 }
242
243 if !is_quiet {
244 ctx.output().info("Press Ctrl+C to stop");
245 }
246 ctx.shutdown_signal().notified().await;
247
248 self.stop_server(ctx).await?;
249 if !is_quiet {
250 ctx.output().success("Modbus simulator stopped");
251 }
252
253 Ok(CommandOutput::quiet_success())
254 }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259 fn protocol(&self) -> Protocol {
260 if self.rtu_mode {
261 Protocol::ModbusRtu
262 } else {
263 Protocol::ModbusTcp
264 }
265 }
266
267 fn default_port(&self) -> u16 {
268 502
269 }
270
271 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272 let output = ctx.output();
273 let spinner = output.spinner("Starting Modbus server...");
274
275 let config = ServerConfigV2 {
276 bind_address: self.bind_addr,
277 ..Default::default()
278 };
279
280 let server = Arc::new(ModbusTcpServerV2::new(config));
281
282 for i in 0..self.devices {
283 let unit_id = (i + 1) as u8;
284 let points = (self.points_per_device / 4) as u16;
285 let device_config = ModbusDeviceConfig {
286 unit_id,
287 name: format!("Device-{}", unit_id),
288 holding_registers: points,
289 input_registers: points,
290 coils: points,
291 discrete_inputs: points,
292 response_delay_ms: 0,
293 tags: self.tags.clone(),
294 };
295 let device = ModbusDevice::new(device_config);
296 server.add_device(device);
297 }
298
299 {
300 let mut server_guard = self.server.lock().await;
301 *server_guard = Some(server.clone());
302 }
303
304 let server_clone = server.clone();
305 let task = tokio::spawn(async move {
306 if let Err(e) = server_clone.run().await {
307 tracing::error!("Modbus server error: {}", e);
308 }
309 });
310
311 {
312 let mut task_guard = self.server_task.lock().await;
313 *task_guard = Some(task);
314 }
315
316 tokio::time::sleep(Duration::from_millis(100)).await;
317
318 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319 Ok(())
320 }
321
322 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323 if let Some(server) = self.server.lock().await.as_ref() {
324 server.shutdown();
325 }
326
327 if let Some(task) = self.server_task.lock().await.take() {
328 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329 }
330
331 Ok(())
332 }
333}
334
335pub struct OpcuaCommand {
341 bind_addr: SocketAddr,
342 endpoint_path: String,
343 nodes: usize,
344 security_mode: String,
345 tags: Tags,
347 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
349 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
351}
352
353impl OpcuaCommand {
354 pub fn new() -> Self {
355 Self {
356 bind_addr: "0.0.0.0:4840".parse().unwrap(),
357 endpoint_path: "/".into(),
358 nodes: 1000,
359 security_mode: "None".into(),
360 tags: Tags::new(),
361 server: Arc::new(Mutex::new(None)),
362 server_task: Arc::new(Mutex::new(None)),
363 }
364 }
365
366 pub fn with_port(mut self, port: u16) -> Self {
367 self.bind_addr.set_port(port);
368 self
369 }
370
371 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
372 self.endpoint_path = path.into();
373 self
374 }
375
376 pub fn with_nodes(mut self, nodes: usize) -> Self {
377 self.nodes = nodes;
378 self
379 }
380
381 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
382 self.security_mode = mode.into();
383 self
384 }
385
386 pub fn with_tags(mut self, tags: Tags) -> Self {
387 self.tags = tags;
388 self
389 }
390}
391
392impl Default for OpcuaCommand {
393 fn default() -> Self {
394 Self::new()
395 }
396}
397
398#[async_trait]
399impl Command for OpcuaCommand {
400 fn name(&self) -> &str {
401 "opcua"
402 }
403
404 fn description(&self) -> &str {
405 "Start an OPC UA server simulator"
406 }
407
408 fn requires_engine(&self) -> bool {
409 true
410 }
411
412 fn supports_shutdown(&self) -> bool {
413 true
414 }
415
416 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
417 let format = ctx.output().format();
418 let is_quiet = ctx.is_quiet();
419 let is_verbose = ctx.is_verbose();
420 let is_debug = ctx.is_debug();
421
422 if !is_quiet {
423 if matches!(format, OutputFormat::Table) {
424 let output = ctx.output();
425 output.header("OPC UA Simulator");
426 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
427 output.kv("Nodes", self.nodes);
428 output.kv("Security Mode", &self.security_mode);
429 }
430 }
431
432 if is_verbose {
434 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
435 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
436 ctx.vprintln(format!(" Max Subscriptions: 1000"));
437 ctx.vprintln(format!(" Max Monitored Items: 10000"));
438 }
439
440 if is_debug {
442 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
443 ctx.dprintln(format!("Node count: {}", self.nodes));
444 ctx.dprintln(format!("Security mode: {}", self.security_mode));
445 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
446 }
447
448 self.start_server(ctx).await?;
449
450 if !is_quiet {
451 match format {
452 OutputFormat::Table => {
453 let colors_enabled = ctx.colors_enabled();
454 let table = TableBuilder::new(colors_enabled)
455 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
456 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
457 .status_row(
458 ["1", &self.nodes.to_string(), "0", "Online"],
459 StatusType::Success,
460 );
461 table.print();
462 }
463 _ => {
464 #[derive(Serialize)]
465 struct OpcuaServerInfo {
466 protocol: String,
467 endpoint: String,
468 nodes: usize,
469 security_mode: String,
470 namespaces: Vec<NamespaceInfo>,
471 status: String,
472 }
473 #[derive(Serialize)]
474 struct NamespaceInfo {
475 index: u32,
476 nodes: String,
477 subscriptions: u32,
478 status: String,
479 }
480 let info = OpcuaServerInfo {
481 protocol: "OPC UA".into(),
482 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
483 nodes: self.nodes,
484 security_mode: self.security_mode.clone(),
485 namespaces: vec![
486 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
487 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
488 ],
489 status: "Online".into(),
490 };
491 let _ = ctx.output().write(&info);
492 }
493 }
494 }
495
496 if !is_quiet {
497 ctx.output().info("Press Ctrl+C to stop");
498 }
499 ctx.shutdown_signal().notified().await;
500
501 self.stop_server(ctx).await?;
502 if !is_quiet {
503 ctx.output().success("OPC UA simulator stopped");
504 }
505
506 Ok(CommandOutput::quiet_success())
507 }
508}
509
510#[async_trait]
511impl ProtocolCommand for OpcuaCommand {
512 fn protocol(&self) -> Protocol {
513 Protocol::OpcUa
514 }
515
516 fn default_port(&self) -> u16 {
517 4840
518 }
519
520 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
521 let output = ctx.output();
522 let spinner = output.spinner("Starting OPC UA server...");
523
524 let config = OpcUaServerConfig {
525 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
526 server_name: "Mabinogion OPC UA Simulator".to_string(),
527 max_subscriptions: 1000,
528 max_monitored_items: 10000,
529 ..Default::default()
530 };
531
532 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
533 crate::error::CliError::ExecutionFailed {
534 message: format!("Failed to create OPC UA server: {}", e)
535 }
536 })?);
537
538 let node_count = self.nodes.min(100);
541 for i in 0..node_count {
542 let node_id = format!("ns=2;i={}", 1000 + i);
543 let name = format!("Variable_{}", i);
544 let value = (i as f64) * 0.1;
545
546 if i % 2 == 0 {
547 let _ = server.add_writable_variable(node_id, name, value);
548 } else {
549 let _ = server.add_variable(node_id, name, value);
550 }
551 }
552
553 {
554 let mut server_guard = self.server.lock().await;
555 *server_guard = Some(server.clone());
556 }
557
558 let server_clone = server.clone();
559 let task = tokio::spawn(async move {
560 if let Err(e) = server_clone.start().await {
561 tracing::error!("OPC UA server error: {}", e);
562 }
563 });
564
565 {
566 let mut task_guard = self.server_task.lock().await;
567 *task_guard = Some(task);
568 }
569
570 tokio::time::sleep(Duration::from_millis(100)).await;
571
572 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
573 Ok(())
574 }
575
576 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
577 if let Some(server) = self.server.lock().await.as_ref() {
578 let _ = server.stop().await;
579 }
580
581 if let Some(task) = self.server_task.lock().await.take() {
582 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
583 }
584
585 Ok(())
586 }
587}
588
589pub struct BacnetCommand {
595 bind_addr: SocketAddr,
596 device_instance: u32,
597 objects: usize,
598 bbmd_enabled: bool,
599 tags: Tags,
601 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
603 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
605}
606
607impl BacnetCommand {
608 pub fn new() -> Self {
609 Self {
610 bind_addr: "0.0.0.0:47808".parse().unwrap(),
611 device_instance: 1234,
612 objects: 100,
613 bbmd_enabled: false,
614 tags: Tags::new(),
615 server: Arc::new(Mutex::new(None)),
616 server_task: Arc::new(Mutex::new(None)),
617 }
618 }
619
620 pub fn with_port(mut self, port: u16) -> Self {
621 self.bind_addr.set_port(port);
622 self
623 }
624
625 pub fn with_device_instance(mut self, instance: u32) -> Self {
626 self.device_instance = instance;
627 self
628 }
629
630 pub fn with_objects(mut self, objects: usize) -> Self {
631 self.objects = objects;
632 self
633 }
634
635 pub fn with_bbmd(mut self, enabled: bool) -> Self {
636 self.bbmd_enabled = enabled;
637 self
638 }
639
640 pub fn with_tags(mut self, tags: Tags) -> Self {
641 self.tags = tags;
642 self
643 }
644}
645
646impl Default for BacnetCommand {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652#[async_trait]
653impl Command for BacnetCommand {
654 fn name(&self) -> &str {
655 "bacnet"
656 }
657
658 fn description(&self) -> &str {
659 "Start a BACnet/IP simulator"
660 }
661
662 fn requires_engine(&self) -> bool {
663 true
664 }
665
666 fn supports_shutdown(&self) -> bool {
667 true
668 }
669
670 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671 let format = ctx.output().format();
672 let is_quiet = ctx.is_quiet();
673 let is_verbose = ctx.is_verbose();
674 let is_debug = ctx.is_debug();
675
676 if !is_quiet {
677 if matches!(format, OutputFormat::Table) {
678 let output = ctx.output();
679 output.header("BACnet/IP Simulator");
680 output.kv("Bind Address", self.bind_addr);
681 output.kv("Device Instance", self.device_instance);
682 output.kv("Objects", self.objects);
683 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
684 }
685 }
686
687 if is_verbose {
689 let per_type = self.objects / 4;
690 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
691 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
692 }
693
694 if is_debug {
696 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
697 ctx.dprintln(format!("Device instance: {}", self.device_instance));
698 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
699 }
700
701 self.start_server(ctx).await?;
702
703 let per_type = self.objects / 4;
704
705 if !is_quiet {
706 match format {
707 OutputFormat::Table => {
708 let colors_enabled = ctx.colors_enabled();
709 let table = TableBuilder::new(colors_enabled)
710 .header(["Object Type", "Count", "Status"])
711 .status_row(["Device", "1", "Online"], StatusType::Success)
712 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
713 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
714 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
715 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
716 table.print();
717 }
718 _ => {
719 #[derive(Serialize)]
720 struct BacnetServerInfo {
721 protocol: String,
722 bind_address: String,
723 device_instance: u32,
724 objects: usize,
725 bbmd_enabled: bool,
726 object_types: Vec<ObjectTypeInfo>,
727 status: String,
728 }
729 #[derive(Serialize)]
730 struct ObjectTypeInfo {
731 object_type: String,
732 count: usize,
733 status: String,
734 }
735 let info = BacnetServerInfo {
736 protocol: "BACnet/IP".into(),
737 bind_address: self.bind_addr.to_string(),
738 device_instance: self.device_instance,
739 objects: self.objects,
740 bbmd_enabled: self.bbmd_enabled,
741 object_types: vec![
742 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
743 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
744 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
745 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
746 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
747 ],
748 status: "Online".into(),
749 };
750 let _ = ctx.output().write(&info);
751 }
752 }
753 }
754
755 if !is_quiet {
756 ctx.output().info("Press Ctrl+C to stop");
757 }
758 ctx.shutdown_signal().notified().await;
759
760 self.stop_server(ctx).await?;
761 if !is_quiet {
762 ctx.output().success("BACnet simulator stopped");
763 }
764
765 Ok(CommandOutput::quiet_success())
766 }
767}
768
769#[async_trait]
770impl ProtocolCommand for BacnetCommand {
771 fn protocol(&self) -> Protocol {
772 Protocol::BacnetIp
773 }
774
775 fn default_port(&self) -> u16 {
776 47808
777 }
778
779 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
780 let output = ctx.output();
781 let spinner = output.spinner("Starting BACnet server...");
782
783 let config = BacnetServerConfig::new(self.device_instance)
784 .with_bind_addr(self.bind_addr)
785 .with_device_name("Mabinogion BACnet Simulator");
786
787 let registry = ObjectRegistry::new();
789
790 let objects_per_type = self.objects / 4;
791 for i in 0..objects_per_type {
792 let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
793 registry.register(Arc::new(ai));
794 }
795 for i in 0..objects_per_type {
796 let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
797 registry.register(Arc::new(ao));
798 }
799 for i in 0..objects_per_type {
800 let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
801 registry.register(Arc::new(bi));
802 }
803 for i in 0..objects_per_type {
804 let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
805 registry.register(Arc::new(bo));
806 }
807
808 let server = Arc::new(BACnetServer::new(config, registry));
809
810 {
811 let mut server_guard = self.server.lock().await;
812 *server_guard = Some(server.clone());
813 }
814
815 let server_clone = server.clone();
816 let task = tokio::spawn(async move {
817 if let Err(e) = server_clone.run().await {
818 tracing::error!("BACnet server error: {}", e);
819 }
820 });
821
822 {
823 let mut task_guard = self.server_task.lock().await;
824 *task_guard = Some(task);
825 }
826
827 tokio::time::sleep(Duration::from_millis(100)).await;
828
829 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
830 Ok(())
831 }
832
833 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
834 if let Some(server) = self.server.lock().await.as_ref() {
835 server.shutdown();
836 }
837
838 if let Some(task) = self.server_task.lock().await.take() {
839 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
840 }
841
842 Ok(())
843 }
844}
845
846pub struct KnxCommand {
852 bind_addr: SocketAddr,
853 individual_address: String,
854 group_objects: usize,
855 tags: Tags,
857 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
859 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
861}
862
863impl KnxCommand {
864 pub fn new() -> Self {
865 Self {
866 bind_addr: "0.0.0.0:3671".parse().unwrap(),
867 individual_address: "1.1.1".into(),
868 group_objects: 100,
869 tags: Tags::new(),
870 server: Arc::new(Mutex::new(None)),
871 server_task: Arc::new(Mutex::new(None)),
872 }
873 }
874
875 pub fn with_port(mut self, port: u16) -> Self {
876 self.bind_addr.set_port(port);
877 self
878 }
879
880 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
881 self.individual_address = addr.into();
882 self
883 }
884
885 pub fn with_group_objects(mut self, count: usize) -> Self {
886 self.group_objects = count;
887 self
888 }
889
890 pub fn with_tags(mut self, tags: Tags) -> Self {
891 self.tags = tags;
892 self
893 }
894}
895
896impl Default for KnxCommand {
897 fn default() -> Self {
898 Self::new()
899 }
900}
901
902#[async_trait]
903impl Command for KnxCommand {
904 fn name(&self) -> &str {
905 "knx"
906 }
907
908 fn description(&self) -> &str {
909 "Start a KNXnet/IP simulator"
910 }
911
912 fn requires_engine(&self) -> bool {
913 true
914 }
915
916 fn supports_shutdown(&self) -> bool {
917 true
918 }
919
920 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
921 let format = ctx.output().format();
922 let is_quiet = ctx.is_quiet();
923 let is_verbose = ctx.is_verbose();
924 let is_debug = ctx.is_debug();
925
926 if !is_quiet {
927 if matches!(format, OutputFormat::Table) {
928 let output = ctx.output();
929 output.header("KNXnet/IP Simulator");
930 output.kv("Bind Address", self.bind_addr);
931 output.kv("Individual Address", &self.individual_address);
932 output.kv("Group Objects", self.group_objects);
933 }
934 }
935
936 if is_verbose {
938 ctx.vprintln(format!(" Max Connections: 10"));
939 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
940 }
941
942 if is_debug {
944 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
945 ctx.dprintln(format!("Individual address: {}", self.individual_address));
946 ctx.dprintln(format!("Group objects: {}", self.group_objects));
947 }
948
949 self.start_server(ctx).await?;
950
951 if !is_quiet {
952 match format {
953 OutputFormat::Table => {
954 let colors_enabled = ctx.colors_enabled();
955 let table = TableBuilder::new(colors_enabled)
956 .header(["Service", "Status"])
957 .status_row(["Core", "Ready"], StatusType::Success)
958 .status_row(["Device Management", "Ready"], StatusType::Success)
959 .status_row(["Tunneling", "Ready"], StatusType::Success);
960 table.print();
961 }
962 _ => {
963 #[derive(Serialize)]
964 struct KnxServerInfo {
965 protocol: String,
966 bind_address: String,
967 individual_address: String,
968 group_objects: usize,
969 services: Vec<ServiceInfo>,
970 status: String,
971 }
972 #[derive(Serialize)]
973 struct ServiceInfo {
974 service: String,
975 status: String,
976 }
977 let info = KnxServerInfo {
978 protocol: "KNXnet/IP".into(),
979 bind_address: self.bind_addr.to_string(),
980 individual_address: self.individual_address.clone(),
981 group_objects: self.group_objects,
982 services: vec![
983 ServiceInfo { service: "Core".into(), status: "Ready".into() },
984 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
985 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
986 ],
987 status: "Online".into(),
988 };
989 let _ = ctx.output().write(&info);
990 }
991 }
992 }
993
994 if !is_quiet {
995 ctx.output().info("Press Ctrl+C to stop");
996 }
997 ctx.shutdown_signal().notified().await;
998
999 self.stop_server(ctx).await?;
1000 if !is_quiet {
1001 ctx.output().success("KNX simulator stopped");
1002 }
1003
1004 Ok(CommandOutput::quiet_success())
1005 }
1006}
1007
1008#[async_trait]
1009impl ProtocolCommand for KnxCommand {
1010 fn protocol(&self) -> Protocol {
1011 Protocol::KnxIp
1012 }
1013
1014 fn default_port(&self) -> u16 {
1015 3671
1016 }
1017
1018 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1019 let output = ctx.output();
1020 let spinner = output.spinner("Starting KNX server...");
1021
1022 let individual_address: IndividualAddress = self.individual_address.parse()
1024 .map_err(|_| crate::error::CliError::ExecutionFailed {
1025 message: format!("Invalid individual address: {}", self.individual_address)
1026 })?;
1027
1028 let config = KnxServerConfig {
1029 bind_addr: self.bind_addr,
1030 individual_address,
1031 max_connections: 256,
1032 ..Default::default()
1033 };
1034
1035 let group_table = Arc::new(GroupObjectTable::new());
1037 let dpt_types = [
1038 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1047 let dpt_names = [
1048 "Switch", "Scaling", "Temperature", "Lux",
1049 "Humidity", "Counter", "SignedCounter", "Float",
1050 ];
1051
1052 for i in 0..self.group_objects {
1053 let main = ((i / 256) + 1) as u8;
1054 let middle = ((i / 8) % 8) as u8;
1055 let sub = (i % 256) as u8;
1056 let addr = GroupAddress::three_level(main, middle, sub);
1057 let dpt_idx = i % dpt_types.len();
1058 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1059 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1060 tracing::warn!("Failed to create group object {}: {}", i, e);
1061 }
1062 }
1063
1064 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1065
1066 {
1067 let mut server_guard = self.server.lock().await;
1068 *server_guard = Some(server.clone());
1069 }
1070
1071 let server_clone = server.clone();
1072 let task = tokio::spawn(async move {
1073 if let Err(e) = server_clone.start().await {
1074 tracing::error!("KNX server error: {}", e);
1075 }
1076 });
1077
1078 {
1079 let mut task_guard = self.server_task.lock().await;
1080 *task_guard = Some(task);
1081 }
1082
1083 tokio::time::sleep(Duration::from_millis(100)).await;
1084
1085 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1086 Ok(())
1087 }
1088
1089 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1090 let server_opt = self.server.lock().await.take();
1092 if let Some(server) = server_opt {
1093 let _ = tokio::task::spawn_blocking(move || {
1095 let rt = tokio::runtime::Handle::current();
1096 rt.block_on(async {
1097 let _ = server.stop().await;
1098 })
1099 }).await;
1100 }
1101
1102 if let Some(task) = self.server_task.lock().await.take() {
1103 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1104 }
1105
1106 Ok(())
1107 }
1108}