1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25#[async_trait]
27pub trait ProtocolCommand: Command {
28 fn protocol(&self) -> Protocol;
30
31 fn default_port(&self) -> u16;
33
34 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
36
37 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
39}
40
41pub struct ModbusCommand {
47 bind_addr: SocketAddr,
49 devices: usize,
51 points_per_device: usize,
53 rtu_mode: bool,
55 serial_port: Option<String>,
57 tags: Tags,
59 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
61 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
63}
64
65impl ModbusCommand {
66 pub fn new() -> Self {
67 Self {
68 bind_addr: "0.0.0.0:502".parse().unwrap(),
69 devices: 1,
70 points_per_device: 100,
71 rtu_mode: false,
72 serial_port: None,
73 tags: Tags::new(),
74 server: Arc::new(Mutex::new(None)),
75 server_task: Arc::new(Mutex::new(None)),
76 }
77 }
78
79 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
80 self.bind_addr = addr;
81 self
82 }
83
84 pub fn with_port(mut self, port: u16) -> Self {
85 self.bind_addr.set_port(port);
86 self
87 }
88
89 pub fn with_devices(mut self, devices: usize) -> Self {
90 self.devices = devices;
91 self
92 }
93
94 pub fn with_points(mut self, points: usize) -> Self {
95 self.points_per_device = points;
96 self
97 }
98
99 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
100 self.rtu_mode = true;
101 self.serial_port = Some(serial_port.into());
102 self
103 }
104
105 pub fn with_tags(mut self, tags: Tags) -> Self {
106 self.tags = tags;
107 self
108 }
109}
110
111impl Default for ModbusCommand {
112 fn default() -> Self {
113 Self::new()
114 }
115}
116
117#[async_trait]
118impl Command for ModbusCommand {
119 fn name(&self) -> &str {
120 "modbus"
121 }
122
123 fn description(&self) -> &str {
124 "Start a Modbus TCP/RTU simulator"
125 }
126
127 fn requires_engine(&self) -> bool {
128 true
129 }
130
131 fn supports_shutdown(&self) -> bool {
132 true
133 }
134
135 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
136 let format = ctx.output().format();
137 let is_quiet = ctx.is_quiet();
138 let is_verbose = ctx.is_verbose();
139 let is_debug = ctx.is_debug();
140
141 if !is_quiet {
142 if matches!(format, OutputFormat::Table) {
143 let output = ctx.output();
144 if self.rtu_mode {
145 output.header("Modbus RTU Simulator");
146 output.kv(
147 "Serial Port",
148 self.serial_port.as_deref().unwrap_or("N/A"),
149 );
150 } else {
151 output.header("Modbus TCP Simulator");
152 output.kv("Bind Address", self.bind_addr);
153 }
154 output.kv("Devices", self.devices);
155 output.kv("Points per Device", self.points_per_device);
156 output.kv("Total Points", self.devices * self.points_per_device);
157 }
158 }
159
160 if is_verbose {
162 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
163 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
164 }
165
166 if is_debug {
168 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
169 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
170 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
171 }
172
173 self.start_server(ctx).await?;
174
175 let points_per_type = self.points_per_device / 4;
176
177 if !is_quiet {
178 match format {
179 OutputFormat::Table => {
180 let colors_enabled = ctx.colors_enabled();
181 let builder = TableBuilder::new(colors_enabled)
182 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
183
184 let devices = self.devices;
185 let pts = points_per_type.to_string();
186 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
187 let unit_id = (i + 1).to_string();
188 (
189 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
190 StatusType::Success,
191 )
192 });
193 table.print();
194 }
195 _ => {
196 #[derive(Serialize)]
197 struct ModbusServerInfo {
198 protocol: String,
199 bind_address: String,
200 devices: usize,
201 points_per_device: usize,
202 total_points: usize,
203 rtu_mode: bool,
204 serial_port: Option<String>,
205 device_list: Vec<ModbusDeviceInfo>,
206 status: String,
207 }
208 #[derive(Serialize)]
209 struct ModbusDeviceInfo {
210 unit_id: usize,
211 holding_registers: usize,
212 input_registers: usize,
213 coils: usize,
214 discrete_inputs: usize,
215 status: String,
216 }
217 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
218 .map(|i| ModbusDeviceInfo {
219 unit_id: i + 1,
220 holding_registers: points_per_type,
221 input_registers: points_per_type,
222 coils: points_per_type,
223 discrete_inputs: points_per_type,
224 status: "Online".into(),
225 })
226 .collect();
227 let info = ModbusServerInfo {
228 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
229 bind_address: self.bind_addr.to_string(),
230 devices: self.devices,
231 points_per_device: self.points_per_device,
232 total_points: self.devices * self.points_per_device,
233 rtu_mode: self.rtu_mode,
234 serial_port: self.serial_port.clone(),
235 device_list,
236 status: "Online".into(),
237 };
238 let _ = ctx.output().write(&info);
239 }
240 }
241 }
242
243 if !is_quiet {
244 ctx.output().info("Press Ctrl+C to stop");
245 }
246 ctx.shutdown_signal().notified().await;
247
248 self.stop_server(ctx).await?;
249 if !is_quiet {
250 ctx.output().success("Modbus simulator stopped");
251 }
252
253 Ok(CommandOutput::quiet_success())
254 }
255}
256
257#[async_trait]
258impl ProtocolCommand for ModbusCommand {
259 fn protocol(&self) -> Protocol {
260 if self.rtu_mode {
261 Protocol::ModbusRtu
262 } else {
263 Protocol::ModbusTcp
264 }
265 }
266
267 fn default_port(&self) -> u16 {
268 502
269 }
270
271 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
272 let output = ctx.output();
273 let spinner = output.spinner("Starting Modbus server...");
274
275 let config = ServerConfigV2 {
276 bind_address: self.bind_addr,
277 ..Default::default()
278 };
279
280 let server = Arc::new(ModbusTcpServerV2::new(config));
281
282 for i in 0..self.devices {
283 let unit_id = (i + 1) as u8;
284 let points = (self.points_per_device / 4) as u16;
285 let device_config = ModbusDeviceConfig {
286 unit_id,
287 name: format!("Device-{}", unit_id),
288 holding_registers: points,
289 input_registers: points,
290 coils: points,
291 discrete_inputs: points,
292 response_delay_ms: 0,
293 tags: self.tags.clone(),
294 };
295 let device = ModbusDevice::new(device_config);
296 server.add_device(device);
297 }
298
299 {
300 let mut server_guard = self.server.lock().await;
301 *server_guard = Some(server.clone());
302 }
303
304 let server_clone = server.clone();
305 let task = tokio::spawn(async move {
306 if let Err(e) = server_clone.run().await {
307 tracing::error!("Modbus server error: {}", e);
308 }
309 });
310
311 {
312 let mut task_guard = self.server_task.lock().await;
313 *task_guard = Some(task);
314 }
315
316 tokio::time::sleep(Duration::from_millis(100)).await;
317
318 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
319 Ok(())
320 }
321
322 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
323 if let Some(server) = self.server.lock().await.as_ref() {
324 server.shutdown();
325 }
326
327 if let Some(task) = self.server_task.lock().await.take() {
328 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
329 }
330
331 Ok(())
332 }
333}
334
335pub struct OpcuaCommand {
341 bind_addr: SocketAddr,
342 endpoint_path: String,
343 nodes: usize,
344 security_mode: String,
345 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
347 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
349}
350
351impl OpcuaCommand {
352 pub fn new() -> Self {
353 Self {
354 bind_addr: "0.0.0.0:4840".parse().unwrap(),
355 endpoint_path: "/".into(),
356 nodes: 1000,
357 security_mode: "None".into(),
358 server: Arc::new(Mutex::new(None)),
359 server_task: Arc::new(Mutex::new(None)),
360 }
361 }
362
363 pub fn with_port(mut self, port: u16) -> Self {
364 self.bind_addr.set_port(port);
365 self
366 }
367
368 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
369 self.endpoint_path = path.into();
370 self
371 }
372
373 pub fn with_nodes(mut self, nodes: usize) -> Self {
374 self.nodes = nodes;
375 self
376 }
377
378 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
379 self.security_mode = mode.into();
380 self
381 }
382}
383
384impl Default for OpcuaCommand {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390#[async_trait]
391impl Command for OpcuaCommand {
392 fn name(&self) -> &str {
393 "opcua"
394 }
395
396 fn description(&self) -> &str {
397 "Start an OPC UA server simulator"
398 }
399
400 fn requires_engine(&self) -> bool {
401 true
402 }
403
404 fn supports_shutdown(&self) -> bool {
405 true
406 }
407
408 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
409 let format = ctx.output().format();
410 let is_quiet = ctx.is_quiet();
411 let is_verbose = ctx.is_verbose();
412 let is_debug = ctx.is_debug();
413
414 if !is_quiet {
415 if matches!(format, OutputFormat::Table) {
416 let output = ctx.output();
417 output.header("OPC UA Simulator");
418 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
419 output.kv("Nodes", self.nodes);
420 output.kv("Security Mode", &self.security_mode);
421 }
422 }
423
424 if is_verbose {
426 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
427 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
428 ctx.vprintln(format!(" Max Subscriptions: 1000"));
429 ctx.vprintln(format!(" Max Monitored Items: 10000"));
430 }
431
432 if is_debug {
434 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
435 ctx.dprintln(format!("Node count: {}", self.nodes));
436 ctx.dprintln(format!("Security mode: {}", self.security_mode));
437 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
438 }
439
440 self.start_server(ctx).await?;
441
442 if !is_quiet {
443 match format {
444 OutputFormat::Table => {
445 let colors_enabled = ctx.colors_enabled();
446 let table = TableBuilder::new(colors_enabled)
447 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
448 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
449 .status_row(
450 ["1", &self.nodes.to_string(), "0", "Online"],
451 StatusType::Success,
452 );
453 table.print();
454 }
455 _ => {
456 #[derive(Serialize)]
457 struct OpcuaServerInfo {
458 protocol: String,
459 endpoint: String,
460 nodes: usize,
461 security_mode: String,
462 namespaces: Vec<NamespaceInfo>,
463 status: String,
464 }
465 #[derive(Serialize)]
466 struct NamespaceInfo {
467 index: u32,
468 nodes: String,
469 subscriptions: u32,
470 status: String,
471 }
472 let info = OpcuaServerInfo {
473 protocol: "OPC UA".into(),
474 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
475 nodes: self.nodes,
476 security_mode: self.security_mode.clone(),
477 namespaces: vec![
478 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
479 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
480 ],
481 status: "Online".into(),
482 };
483 let _ = ctx.output().write(&info);
484 }
485 }
486 }
487
488 if !is_quiet {
489 ctx.output().info("Press Ctrl+C to stop");
490 }
491 ctx.shutdown_signal().notified().await;
492
493 self.stop_server(ctx).await?;
494 if !is_quiet {
495 ctx.output().success("OPC UA simulator stopped");
496 }
497
498 Ok(CommandOutput::quiet_success())
499 }
500}
501
502#[async_trait]
503impl ProtocolCommand for OpcuaCommand {
504 fn protocol(&self) -> Protocol {
505 Protocol::OpcUa
506 }
507
508 fn default_port(&self) -> u16 {
509 4840
510 }
511
512 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
513 let output = ctx.output();
514 let spinner = output.spinner("Starting OPC UA server...");
515
516 let config = OpcUaServerConfig {
517 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
518 server_name: "Mabinogion OPC UA Simulator".to_string(),
519 max_subscriptions: 1000,
520 max_monitored_items: 10000,
521 ..Default::default()
522 };
523
524 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
525 crate::error::CliError::ExecutionFailed {
526 message: format!("Failed to create OPC UA server: {}", e)
527 }
528 })?);
529
530 let node_count = self.nodes.min(100);
533 for i in 0..node_count {
534 let node_id = format!("ns=2;i={}", 1000 + i);
535 let name = format!("Variable_{}", i);
536 let value = (i as f64) * 0.1;
537
538 if i % 2 == 0 {
539 let _ = server.add_writable_variable(node_id, name, value);
540 } else {
541 let _ = server.add_variable(node_id, name, value);
542 }
543 }
544
545 {
546 let mut server_guard = self.server.lock().await;
547 *server_guard = Some(server.clone());
548 }
549
550 let server_clone = server.clone();
551 let task = tokio::spawn(async move {
552 if let Err(e) = server_clone.start().await {
553 tracing::error!("OPC UA server error: {}", e);
554 }
555 });
556
557 {
558 let mut task_guard = self.server_task.lock().await;
559 *task_guard = Some(task);
560 }
561
562 tokio::time::sleep(Duration::from_millis(100)).await;
563
564 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
565 Ok(())
566 }
567
568 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
569 if let Some(server) = self.server.lock().await.as_ref() {
570 let _ = server.stop().await;
571 }
572
573 if let Some(task) = self.server_task.lock().await.take() {
574 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
575 }
576
577 Ok(())
578 }
579}
580
581pub struct BacnetCommand {
587 bind_addr: SocketAddr,
588 device_instance: u32,
589 objects: usize,
590 bbmd_enabled: bool,
591 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
593 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
595}
596
597impl BacnetCommand {
598 pub fn new() -> Self {
599 Self {
600 bind_addr: "0.0.0.0:47808".parse().unwrap(),
601 device_instance: 1234,
602 objects: 100,
603 bbmd_enabled: false,
604 server: Arc::new(Mutex::new(None)),
605 server_task: Arc::new(Mutex::new(None)),
606 }
607 }
608
609 pub fn with_port(mut self, port: u16) -> Self {
610 self.bind_addr.set_port(port);
611 self
612 }
613
614 pub fn with_device_instance(mut self, instance: u32) -> Self {
615 self.device_instance = instance;
616 self
617 }
618
619 pub fn with_objects(mut self, objects: usize) -> Self {
620 self.objects = objects;
621 self
622 }
623
624 pub fn with_bbmd(mut self, enabled: bool) -> Self {
625 self.bbmd_enabled = enabled;
626 self
627 }
628}
629
630impl Default for BacnetCommand {
631 fn default() -> Self {
632 Self::new()
633 }
634}
635
636#[async_trait]
637impl Command for BacnetCommand {
638 fn name(&self) -> &str {
639 "bacnet"
640 }
641
642 fn description(&self) -> &str {
643 "Start a BACnet/IP simulator"
644 }
645
646 fn requires_engine(&self) -> bool {
647 true
648 }
649
650 fn supports_shutdown(&self) -> bool {
651 true
652 }
653
654 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
655 let format = ctx.output().format();
656 let is_quiet = ctx.is_quiet();
657 let is_verbose = ctx.is_verbose();
658 let is_debug = ctx.is_debug();
659
660 if !is_quiet {
661 if matches!(format, OutputFormat::Table) {
662 let output = ctx.output();
663 output.header("BACnet/IP Simulator");
664 output.kv("Bind Address", self.bind_addr);
665 output.kv("Device Instance", self.device_instance);
666 output.kv("Objects", self.objects);
667 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
668 }
669 }
670
671 if is_verbose {
673 let per_type = self.objects / 4;
674 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
675 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
676 }
677
678 if is_debug {
680 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
681 ctx.dprintln(format!("Device instance: {}", self.device_instance));
682 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
683 }
684
685 self.start_server(ctx).await?;
686
687 let per_type = self.objects / 4;
688
689 if !is_quiet {
690 match format {
691 OutputFormat::Table => {
692 let colors_enabled = ctx.colors_enabled();
693 let table = TableBuilder::new(colors_enabled)
694 .header(["Object Type", "Count", "Status"])
695 .status_row(["Device", "1", "Online"], StatusType::Success)
696 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
697 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
698 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
699 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
700 table.print();
701 }
702 _ => {
703 #[derive(Serialize)]
704 struct BacnetServerInfo {
705 protocol: String,
706 bind_address: String,
707 device_instance: u32,
708 objects: usize,
709 bbmd_enabled: bool,
710 object_types: Vec<ObjectTypeInfo>,
711 status: String,
712 }
713 #[derive(Serialize)]
714 struct ObjectTypeInfo {
715 object_type: String,
716 count: usize,
717 status: String,
718 }
719 let info = BacnetServerInfo {
720 protocol: "BACnet/IP".into(),
721 bind_address: self.bind_addr.to_string(),
722 device_instance: self.device_instance,
723 objects: self.objects,
724 bbmd_enabled: self.bbmd_enabled,
725 object_types: vec![
726 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
727 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
728 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
729 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
730 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
731 ],
732 status: "Online".into(),
733 };
734 let _ = ctx.output().write(&info);
735 }
736 }
737 }
738
739 if !is_quiet {
740 ctx.output().info("Press Ctrl+C to stop");
741 }
742 ctx.shutdown_signal().notified().await;
743
744 self.stop_server(ctx).await?;
745 if !is_quiet {
746 ctx.output().success("BACnet simulator stopped");
747 }
748
749 Ok(CommandOutput::quiet_success())
750 }
751}
752
753#[async_trait]
754impl ProtocolCommand for BacnetCommand {
755 fn protocol(&self) -> Protocol {
756 Protocol::BacnetIp
757 }
758
759 fn default_port(&self) -> u16 {
760 47808
761 }
762
763 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
764 let output = ctx.output();
765 let spinner = output.spinner("Starting BACnet server...");
766
767 let config = BacnetServerConfig::new(self.device_instance)
768 .with_bind_addr(self.bind_addr)
769 .with_device_name("Mabinogion BACnet Simulator");
770
771 let registry = ObjectRegistry::new();
773
774 let objects_per_type = self.objects / 4;
775 for i in 0..objects_per_type {
776 let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
777 registry.register(Arc::new(ai));
778 }
779 for i in 0..objects_per_type {
780 let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
781 registry.register(Arc::new(ao));
782 }
783 for i in 0..objects_per_type {
784 let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
785 registry.register(Arc::new(bi));
786 }
787 for i in 0..objects_per_type {
788 let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
789 registry.register(Arc::new(bo));
790 }
791
792 let server = Arc::new(BACnetServer::new(config, registry));
793
794 {
795 let mut server_guard = self.server.lock().await;
796 *server_guard = Some(server.clone());
797 }
798
799 let server_clone = server.clone();
800 let task = tokio::spawn(async move {
801 if let Err(e) = server_clone.run().await {
802 tracing::error!("BACnet server error: {}", e);
803 }
804 });
805
806 {
807 let mut task_guard = self.server_task.lock().await;
808 *task_guard = Some(task);
809 }
810
811 tokio::time::sleep(Duration::from_millis(100)).await;
812
813 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
814 Ok(())
815 }
816
817 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
818 if let Some(server) = self.server.lock().await.as_ref() {
819 server.shutdown();
820 }
821
822 if let Some(task) = self.server_task.lock().await.take() {
823 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
824 }
825
826 Ok(())
827 }
828}
829
830pub struct KnxCommand {
836 bind_addr: SocketAddr,
837 individual_address: String,
838 group_objects: usize,
839 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
841 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
843}
844
845impl KnxCommand {
846 pub fn new() -> Self {
847 Self {
848 bind_addr: "0.0.0.0:3671".parse().unwrap(),
849 individual_address: "1.1.1".into(),
850 group_objects: 100,
851 server: Arc::new(Mutex::new(None)),
852 server_task: Arc::new(Mutex::new(None)),
853 }
854 }
855
856 pub fn with_port(mut self, port: u16) -> Self {
857 self.bind_addr.set_port(port);
858 self
859 }
860
861 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
862 self.individual_address = addr.into();
863 self
864 }
865
866 pub fn with_group_objects(mut self, count: usize) -> Self {
867 self.group_objects = count;
868 self
869 }
870}
871
872impl Default for KnxCommand {
873 fn default() -> Self {
874 Self::new()
875 }
876}
877
878#[async_trait]
879impl Command for KnxCommand {
880 fn name(&self) -> &str {
881 "knx"
882 }
883
884 fn description(&self) -> &str {
885 "Start a KNXnet/IP simulator"
886 }
887
888 fn requires_engine(&self) -> bool {
889 true
890 }
891
892 fn supports_shutdown(&self) -> bool {
893 true
894 }
895
896 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
897 let format = ctx.output().format();
898 let is_quiet = ctx.is_quiet();
899 let is_verbose = ctx.is_verbose();
900 let is_debug = ctx.is_debug();
901
902 if !is_quiet {
903 if matches!(format, OutputFormat::Table) {
904 let output = ctx.output();
905 output.header("KNXnet/IP Simulator");
906 output.kv("Bind Address", self.bind_addr);
907 output.kv("Individual Address", &self.individual_address);
908 output.kv("Group Objects", self.group_objects);
909 }
910 }
911
912 if is_verbose {
914 ctx.vprintln(format!(" Max Connections: 10"));
915 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
916 }
917
918 if is_debug {
920 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
921 ctx.dprintln(format!("Individual address: {}", self.individual_address));
922 ctx.dprintln(format!("Group objects: {}", self.group_objects));
923 }
924
925 self.start_server(ctx).await?;
926
927 if !is_quiet {
928 match format {
929 OutputFormat::Table => {
930 let colors_enabled = ctx.colors_enabled();
931 let table = TableBuilder::new(colors_enabled)
932 .header(["Service", "Status"])
933 .status_row(["Core", "Ready"], StatusType::Success)
934 .status_row(["Device Management", "Ready"], StatusType::Success)
935 .status_row(["Tunneling", "Ready"], StatusType::Success);
936 table.print();
937 }
938 _ => {
939 #[derive(Serialize)]
940 struct KnxServerInfo {
941 protocol: String,
942 bind_address: String,
943 individual_address: String,
944 group_objects: usize,
945 services: Vec<ServiceInfo>,
946 status: String,
947 }
948 #[derive(Serialize)]
949 struct ServiceInfo {
950 service: String,
951 status: String,
952 }
953 let info = KnxServerInfo {
954 protocol: "KNXnet/IP".into(),
955 bind_address: self.bind_addr.to_string(),
956 individual_address: self.individual_address.clone(),
957 group_objects: self.group_objects,
958 services: vec![
959 ServiceInfo { service: "Core".into(), status: "Ready".into() },
960 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
961 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
962 ],
963 status: "Online".into(),
964 };
965 let _ = ctx.output().write(&info);
966 }
967 }
968 }
969
970 if !is_quiet {
971 ctx.output().info("Press Ctrl+C to stop");
972 }
973 ctx.shutdown_signal().notified().await;
974
975 self.stop_server(ctx).await?;
976 if !is_quiet {
977 ctx.output().success("KNX simulator stopped");
978 }
979
980 Ok(CommandOutput::quiet_success())
981 }
982}
983
984#[async_trait]
985impl ProtocolCommand for KnxCommand {
986 fn protocol(&self) -> Protocol {
987 Protocol::KnxIp
988 }
989
990 fn default_port(&self) -> u16 {
991 3671
992 }
993
994 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
995 let output = ctx.output();
996 let spinner = output.spinner("Starting KNX server...");
997
998 let individual_address: IndividualAddress = self.individual_address.parse()
1000 .map_err(|_| crate::error::CliError::ExecutionFailed {
1001 message: format!("Invalid individual address: {}", self.individual_address)
1002 })?;
1003
1004 let config = KnxServerConfig {
1005 bind_addr: self.bind_addr,
1006 individual_address,
1007 max_connections: 256,
1008 ..Default::default()
1009 };
1010
1011 let group_table = Arc::new(GroupObjectTable::new());
1013 let dpt_types = [
1014 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1023 let dpt_names = [
1024 "Switch", "Scaling", "Temperature", "Lux",
1025 "Humidity", "Counter", "SignedCounter", "Float",
1026 ];
1027
1028 for i in 0..self.group_objects {
1029 let main = ((i / 256) + 1) as u8;
1030 let middle = ((i / 8) % 8) as u8;
1031 let sub = (i % 256) as u8;
1032 let addr = GroupAddress::three_level(main, middle, sub);
1033 let dpt_idx = i % dpt_types.len();
1034 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1035 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1036 tracing::warn!("Failed to create group object {}: {}", i, e);
1037 }
1038 }
1039
1040 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1041
1042 {
1043 let mut server_guard = self.server.lock().await;
1044 *server_guard = Some(server.clone());
1045 }
1046
1047 let server_clone = server.clone();
1048 let task = tokio::spawn(async move {
1049 if let Err(e) = server_clone.start().await {
1050 tracing::error!("KNX server error: {}", e);
1051 }
1052 });
1053
1054 {
1055 let mut task_guard = self.server_task.lock().await;
1056 *task_guard = Some(task);
1057 }
1058
1059 tokio::time::sleep(Duration::from_millis(100)).await;
1060
1061 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1062 Ok(())
1063 }
1064
1065 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1066 let server_opt = self.server.lock().await.take();
1068 if let Some(server) = server_opt {
1069 let _ = tokio::task::spawn_blocking(move || {
1071 let rt = tokio::runtime::Handle::current();
1072 rt.block_on(async {
1073 let _ = server.stop().await;
1074 })
1075 }).await;
1076 }
1077
1078 if let Some(task) = self.server_task.lock().await.take() {
1079 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1080 }
1081
1082 Ok(())
1083 }
1084}