1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_bacnet::prelude::{
21 default_object_descriptors, BACnetServer, ObjectRegistry, ServerConfig as BacnetServerConfig,
22};
23use mabi_knx::{
24 DptId, GroupAddress, GroupObjectTable, IndividualAddress, KnxServer, KnxServerConfig,
25};
26use mabi_modbus::{tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig, ModbusTcpServerV2};
27use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
28
29async fn check_port_availability(addr: SocketAddr) {
34 use tokio::io::{AsyncReadExt, AsyncWriteExt};
35 use tokio::net::TcpStream;
36
37 let connect_result =
39 tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
40
41 match connect_result {
42 Ok(Ok(_first_stream)) => {
43 drop(_first_stream);
45
46 let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
47 let mut stream = TcpStream::connect(addr).await?;
48 let request: [u8; 12] = [
50 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
51 ];
52 stream.write_all(&request).await?;
53 let mut response = [0u8; 32];
54 let n = stream.read(&mut response).await?;
55 Ok::<_, std::io::Error>(n)
56 })
57 .await;
58
59 match probe_result {
60 Ok(Ok(n)) if n >= 7 => {
61 tracing::warn!(
62 port = addr.port(),
63 "Port {} is already in use by a responding Modbus server. \
64 The new server will fail to bind.",
65 addr.port()
66 );
67 }
68 _ => {
69 tracing::warn!(
71 port = addr.port(),
72 "Port {} is in use: TCP connects but no Modbus response. \
73 This may be a suspended (zombie) process holding the port.\n \
74 Diagnostic: lsof -i :{} | grep LISTEN\n \
75 To kill: kill $(lsof -ti :{} -sTCP:LISTEN)",
76 addr.port(),
77 addr.port(),
78 addr.port()
79 );
80 }
81 }
82 }
83 Ok(Err(_)) | Err(_) => {
84 tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
86 }
87 }
88}
89
90#[async_trait]
92pub trait ProtocolCommand: Command {
93 fn protocol(&self) -> Protocol;
95
96 fn default_port(&self) -> u16;
98
99 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
101
102 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
104}
105
106pub struct ModbusCommand {
112 bind_addr: SocketAddr,
114 devices: usize,
116 points_per_device: usize,
118 rtu_mode: bool,
120 serial_port: Option<String>,
122 tags: Tags,
124 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
126 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
128}
129
130impl ModbusCommand {
131 pub fn new() -> Self {
132 Self {
133 bind_addr: "0.0.0.0:502".parse().unwrap(),
134 devices: 1,
135 points_per_device: 100,
136 rtu_mode: false,
137 serial_port: None,
138 tags: Tags::new(),
139 server: Arc::new(Mutex::new(None)),
140 server_task: Arc::new(Mutex::new(None)),
141 }
142 }
143
144 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
145 self.bind_addr = addr;
146 self
147 }
148
149 pub fn with_port(mut self, port: u16) -> Self {
150 self.bind_addr.set_port(port);
151 self
152 }
153
154 pub fn with_devices(mut self, devices: usize) -> Self {
155 self.devices = devices;
156 self
157 }
158
159 pub fn with_points(mut self, points: usize) -> Self {
160 self.points_per_device = points;
161 self
162 }
163
164 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
165 self.rtu_mode = true;
166 self.serial_port = Some(serial_port.into());
167 self
168 }
169
170 pub fn with_tags(mut self, tags: Tags) -> Self {
171 self.tags = tags;
172 self
173 }
174}
175
176impl Default for ModbusCommand {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182#[async_trait]
183impl Command for ModbusCommand {
184 fn name(&self) -> &str {
185 "modbus"
186 }
187
188 fn description(&self) -> &str {
189 "Start a Modbus TCP/RTU simulator"
190 }
191
192 fn requires_engine(&self) -> bool {
193 true
194 }
195
196 fn supports_shutdown(&self) -> bool {
197 true
198 }
199
200 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
201 let format = ctx.output().format();
202 let is_quiet = ctx.is_quiet();
203 let is_verbose = ctx.is_verbose();
204 let is_debug = ctx.is_debug();
205
206 if !is_quiet {
207 if matches!(format, OutputFormat::Table) {
208 let output = ctx.output();
209 if self.rtu_mode {
210 output.header("Modbus RTU Simulator");
211 output.kv("Serial Port", self.serial_port.as_deref().unwrap_or("N/A"));
212 } else {
213 output.header("Modbus TCP Simulator");
214 output.kv("Bind Address", self.bind_addr);
215 }
216 output.kv("Devices", self.devices);
217 output.kv("Points per Device", self.points_per_device);
218 output.kv("Total Points", self.devices * self.points_per_device);
219 }
220 }
221
222 if is_verbose {
224 ctx.vprintln(format!(
225 " Protocol Mode: {}",
226 if self.rtu_mode { "RTU" } else { "TCP" }
227 ));
228 ctx.vprintln(format!(
229 " Points Distribution: {} per register type",
230 self.points_per_device / 4
231 ));
232 }
233
234 if is_debug {
236 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
237 ctx.dprintln(format!(
238 "RTU mode: {}, Serial: {:?}",
239 self.rtu_mode, self.serial_port
240 ));
241 ctx.dprintln(format!(
242 "Devices: {}, Points/device: {}",
243 self.devices, self.points_per_device
244 ));
245 }
246
247 self.start_server(ctx).await?;
248
249 let points_per_type = self.points_per_device / 4;
250
251 if !is_quiet {
252 match format {
253 OutputFormat::Table => {
254 let colors_enabled = ctx.colors_enabled();
255 let builder = TableBuilder::new(colors_enabled).header([
256 "Unit ID",
257 "Holding Regs",
258 "Input Regs",
259 "Coils",
260 "Discrete",
261 "Status",
262 ]);
263
264 let devices = self.devices;
265 let pts = points_per_type.to_string();
266 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
267 let unit_id = (i + 1).to_string();
268 (
269 vec![
270 unit_id,
271 pts.clone(),
272 pts.clone(),
273 pts.clone(),
274 pts.clone(),
275 "Online".into(),
276 ],
277 StatusType::Success,
278 )
279 });
280 table.print();
281 }
282 _ => {
283 #[derive(Serialize)]
284 struct ModbusServerInfo {
285 protocol: String,
286 bind_address: String,
287 devices: usize,
288 points_per_device: usize,
289 total_points: usize,
290 rtu_mode: bool,
291 serial_port: Option<String>,
292 device_list: Vec<ModbusDeviceInfo>,
293 status: String,
294 }
295 #[derive(Serialize)]
296 struct ModbusDeviceInfo {
297 unit_id: usize,
298 holding_registers: usize,
299 input_registers: usize,
300 coils: usize,
301 discrete_inputs: usize,
302 status: String,
303 }
304 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
305 .map(|i| ModbusDeviceInfo {
306 unit_id: i + 1,
307 holding_registers: points_per_type,
308 input_registers: points_per_type,
309 coils: points_per_type,
310 discrete_inputs: points_per_type,
311 status: "Online".into(),
312 })
313 .collect();
314 let info = ModbusServerInfo {
315 protocol: if self.rtu_mode {
316 "Modbus RTU".into()
317 } else {
318 "Modbus TCP".into()
319 },
320 bind_address: self.bind_addr.to_string(),
321 devices: self.devices,
322 points_per_device: self.points_per_device,
323 total_points: self.devices * self.points_per_device,
324 rtu_mode: self.rtu_mode,
325 serial_port: self.serial_port.clone(),
326 device_list,
327 status: "Online".into(),
328 };
329 let _ = ctx.output().write(&info);
330 }
331 }
332 }
333
334 if !is_quiet {
335 ctx.output().info("Press Ctrl+C to stop");
336 }
337 ctx.shutdown_signal().notified().await;
338
339 self.stop_server(ctx).await?;
340 if !is_quiet {
341 ctx.output().success("Modbus simulator stopped");
342 }
343
344 Ok(CommandOutput::quiet_success())
345 }
346}
347
348#[async_trait]
349impl ProtocolCommand for ModbusCommand {
350 fn protocol(&self) -> Protocol {
351 if self.rtu_mode {
352 Protocol::ModbusRtu
353 } else {
354 Protocol::ModbusTcp
355 }
356 }
357
358 fn default_port(&self) -> u16 {
359 502
360 }
361
362 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363 let output = ctx.output();
364
365 check_port_availability(self.bind_addr).await;
367
368 let spinner = output.spinner("Starting Modbus server...");
369
370 let config = ServerConfigV2 {
371 bind_address: self.bind_addr,
372 ..Default::default()
373 };
374
375 let server = Arc::new(ModbusTcpServerV2::new(config));
376
377 for i in 0..self.devices {
378 let unit_id = (i + 1) as u8;
379 let points = (self.points_per_device / 4) as u16;
380 let device_config = ModbusDeviceConfig {
381 unit_id,
382 name: format!("Device-{}", unit_id),
383 holding_registers: points,
384 input_registers: points,
385 coils: points,
386 discrete_inputs: points,
387 response_delay_ms: 0,
388 tags: self.tags.clone(),
389 };
390 let device = ModbusDevice::new(device_config);
391 server.add_device(device);
392 }
393
394 {
395 let mut server_guard = self.server.lock().await;
396 *server_guard = Some(server.clone());
397 }
398
399 let server_clone = server.clone();
400 let task = tokio::spawn(async move {
401 if let Err(e) = server_clone.run().await {
402 tracing::error!("Modbus server error: {}", e);
403 }
404 });
405
406 {
407 let mut task_guard = self.server_task.lock().await;
408 *task_guard = Some(task);
409 }
410
411 tokio::time::sleep(Duration::from_millis(100)).await;
412
413 {
415 let task_guard = self.server_task.lock().await;
416 if let Some(task) = task_guard.as_ref() {
417 if task.is_finished() {
418 spinner.finish_with_message("Failed to start server");
419 return Err(crate::error::CliError::PortInUse {
420 port: self.bind_addr.port(),
421 });
422 }
423 }
424 }
425
426 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
427 Ok(())
428 }
429
430 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
431 if let Some(server) = self.server.lock().await.as_ref() {
432 server.shutdown();
433 }
434
435 if let Some(task) = self.server_task.lock().await.take() {
436 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
437 }
438
439 Ok(())
440 }
441}
442
443pub struct OpcuaCommand {
449 bind_addr: SocketAddr,
450 endpoint_path: String,
451 nodes: usize,
452 security_mode: String,
453 tags: Tags,
455 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
457 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
459}
460
461impl OpcuaCommand {
462 pub fn new() -> Self {
463 Self {
464 bind_addr: "0.0.0.0:4840".parse().unwrap(),
465 endpoint_path: "/".into(),
466 nodes: 1000,
467 security_mode: "None".into(),
468 tags: Tags::new(),
469 server: Arc::new(Mutex::new(None)),
470 server_task: Arc::new(Mutex::new(None)),
471 }
472 }
473
474 pub fn with_port(mut self, port: u16) -> Self {
475 self.bind_addr.set_port(port);
476 self
477 }
478
479 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
480 self.endpoint_path = path.into();
481 self
482 }
483
484 pub fn with_nodes(mut self, nodes: usize) -> Self {
485 self.nodes = nodes;
486 self
487 }
488
489 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
490 self.security_mode = mode.into();
491 self
492 }
493
494 pub fn with_tags(mut self, tags: Tags) -> Self {
495 self.tags = tags;
496 self
497 }
498}
499
500impl Default for OpcuaCommand {
501 fn default() -> Self {
502 Self::new()
503 }
504}
505
506#[async_trait]
507impl Command for OpcuaCommand {
508 fn name(&self) -> &str {
509 "opcua"
510 }
511
512 fn description(&self) -> &str {
513 "Start an OPC UA server simulator"
514 }
515
516 fn requires_engine(&self) -> bool {
517 true
518 }
519
520 fn supports_shutdown(&self) -> bool {
521 true
522 }
523
524 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
525 let format = ctx.output().format();
526 let is_quiet = ctx.is_quiet();
527 let is_verbose = ctx.is_verbose();
528 let is_debug = ctx.is_debug();
529
530 if !is_quiet {
531 if matches!(format, OutputFormat::Table) {
532 let output = ctx.output();
533 output.header("OPC UA Simulator");
534 output.kv(
535 "Endpoint",
536 format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
537 );
538 output.kv("Nodes", self.nodes);
539 output.kv("Security Mode", &self.security_mode);
540 }
541 }
542
543 if is_verbose {
545 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
546 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
547 ctx.vprintln(format!(" Max Subscriptions: 1000"));
548 ctx.vprintln(format!(" Max Monitored Items: 10000"));
549 }
550
551 if is_debug {
553 ctx.dprintln(format!(
554 "Full endpoint URL: opc.tcp://{}{}",
555 self.bind_addr, self.endpoint_path
556 ));
557 ctx.dprintln(format!("Node count: {}", self.nodes));
558 ctx.dprintln(format!("Security mode: {}", self.security_mode));
559 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
560 }
561
562 self.start_server(ctx).await?;
563
564 if !is_quiet {
565 match format {
566 OutputFormat::Table => {
567 let colors_enabled = ctx.colors_enabled();
568 let table = TableBuilder::new(colors_enabled)
569 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
570 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
571 .status_row(
572 ["1", &self.nodes.to_string(), "0", "Online"],
573 StatusType::Success,
574 );
575 table.print();
576 }
577 _ => {
578 #[derive(Serialize)]
579 struct OpcuaServerInfo {
580 protocol: String,
581 endpoint: String,
582 nodes: usize,
583 security_mode: String,
584 namespaces: Vec<NamespaceInfo>,
585 status: String,
586 }
587 #[derive(Serialize)]
588 struct NamespaceInfo {
589 index: u32,
590 nodes: String,
591 subscriptions: u32,
592 status: String,
593 }
594 let info = OpcuaServerInfo {
595 protocol: "OPC UA".into(),
596 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
597 nodes: self.nodes,
598 security_mode: self.security_mode.clone(),
599 namespaces: vec![
600 NamespaceInfo {
601 index: 0,
602 nodes: "Standard".into(),
603 subscriptions: 0,
604 status: "Ready".into(),
605 },
606 NamespaceInfo {
607 index: 1,
608 nodes: self.nodes.to_string(),
609 subscriptions: 0,
610 status: "Online".into(),
611 },
612 ],
613 status: "Online".into(),
614 };
615 let _ = ctx.output().write(&info);
616 }
617 }
618 }
619
620 if !is_quiet {
621 ctx.output().info("Press Ctrl+C to stop");
622 }
623 ctx.shutdown_signal().notified().await;
624
625 self.stop_server(ctx).await?;
626 if !is_quiet {
627 ctx.output().success("OPC UA simulator stopped");
628 }
629
630 Ok(CommandOutput::quiet_success())
631 }
632}
633
634#[async_trait]
635impl ProtocolCommand for OpcuaCommand {
636 fn protocol(&self) -> Protocol {
637 Protocol::OpcUa
638 }
639
640 fn default_port(&self) -> u16 {
641 4840
642 }
643
644 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
645 let output = ctx.output();
646 let spinner = output.spinner("Starting OPC UA server...");
647
648 let config = OpcUaServerConfig {
649 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
650 server_name: "Mabinogion OPC UA Simulator".to_string(),
651 max_subscriptions: 1000,
652 max_monitored_items: 10000,
653 ..Default::default()
654 };
655
656 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
657 crate::error::CliError::ExecutionFailed {
658 message: format!("Failed to create OPC UA server: {}", e),
659 }
660 })?);
661
662 let node_count = self.nodes.min(100);
665 for i in 0..node_count {
666 let node_id = format!("ns=2;i={}", 1000 + i);
667 let name = format!("Variable_{}", i);
668 let value = (i as f64) * 0.1;
669
670 if i % 2 == 0 {
671 let _ = server.add_writable_variable(node_id, name, value);
672 } else {
673 let _ = server.add_variable(node_id, name, value);
674 }
675 }
676
677 {
678 let mut server_guard = self.server.lock().await;
679 *server_guard = Some(server.clone());
680 }
681
682 let server_clone = server.clone();
683 let task = tokio::spawn(async move {
684 if let Err(e) = server_clone.start().await {
685 tracing::error!("OPC UA server error: {}", e);
686 }
687 });
688
689 {
690 let mut task_guard = self.server_task.lock().await;
691 *task_guard = Some(task);
692 }
693
694 tokio::time::sleep(Duration::from_millis(100)).await;
695
696 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
697 Ok(())
698 }
699
700 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
701 if let Some(server) = self.server.lock().await.as_ref() {
702 let _ = server.stop().await;
703 }
704
705 if let Some(task) = self.server_task.lock().await.take() {
706 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
707 }
708
709 Ok(())
710 }
711}
712
713pub struct BacnetCommand {
719 bind_addr: SocketAddr,
720 device_instance: u32,
721 objects: usize,
722 bbmd_enabled: bool,
723 tags: Tags,
725 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
727 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
729}
730
731impl BacnetCommand {
732 pub fn new() -> Self {
733 Self {
734 bind_addr: "0.0.0.0:47808".parse().unwrap(),
735 device_instance: 1234,
736 objects: 100,
737 bbmd_enabled: false,
738 tags: Tags::new(),
739 server: Arc::new(Mutex::new(None)),
740 server_task: Arc::new(Mutex::new(None)),
741 }
742 }
743
744 pub fn with_port(mut self, port: u16) -> Self {
745 self.bind_addr.set_port(port);
746 self
747 }
748
749 pub fn with_device_instance(mut self, instance: u32) -> Self {
750 self.device_instance = instance;
751 self
752 }
753
754 pub fn with_objects(mut self, objects: usize) -> Self {
755 self.objects = objects;
756 self
757 }
758
759 pub fn with_bbmd(mut self, enabled: bool) -> Self {
760 self.bbmd_enabled = enabled;
761 self
762 }
763
764 pub fn with_tags(mut self, tags: Tags) -> Self {
765 self.tags = tags;
766 self
767 }
768}
769
770impl Default for BacnetCommand {
771 fn default() -> Self {
772 Self::new()
773 }
774}
775
776#[async_trait]
777impl Command for BacnetCommand {
778 fn name(&self) -> &str {
779 "bacnet"
780 }
781
782 fn description(&self) -> &str {
783 "Start a BACnet/IP simulator"
784 }
785
786 fn requires_engine(&self) -> bool {
787 true
788 }
789
790 fn supports_shutdown(&self) -> bool {
791 true
792 }
793
794 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
795 let format = ctx.output().format();
796 let is_quiet = ctx.is_quiet();
797 let is_verbose = ctx.is_verbose();
798 let is_debug = ctx.is_debug();
799
800 if !is_quiet {
801 if matches!(format, OutputFormat::Table) {
802 let output = ctx.output();
803 output.header("BACnet/IP Simulator");
804 output.kv("Bind Address", self.bind_addr);
805 output.kv("Device Instance", self.device_instance);
806 output.kv("Objects", self.objects);
807 output.kv(
808 "BBMD",
809 if self.bbmd_enabled {
810 "Enabled"
811 } else {
812 "Disabled"
813 },
814 );
815 }
816 }
817
818 if is_verbose {
820 let per_type = self.objects / 4;
821 ctx.vprintln(format!(
822 " Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})",
823 per_type, per_type, per_type, per_type, per_type
824 ));
825 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
826 }
827
828 if is_debug {
830 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
831 ctx.dprintln(format!("Device instance: {}", self.device_instance));
832 ctx.dprintln(format!(
833 "Total objects: {}, BBMD: {}",
834 self.objects, self.bbmd_enabled
835 ));
836 }
837
838 self.start_server(ctx).await?;
839
840 let per_type = self.objects / 4;
841
842 if !is_quiet {
843 match format {
844 OutputFormat::Table => {
845 let colors_enabled = ctx.colors_enabled();
846 let table = TableBuilder::new(colors_enabled)
847 .header(["Object Type", "Count", "Status"])
848 .status_row(["Device", "1", "Online"], StatusType::Success)
849 .status_row(
850 ["Analog Input", &per_type.to_string(), "Active"],
851 StatusType::Success,
852 )
853 .status_row(
854 ["Analog Output", &per_type.to_string(), "Active"],
855 StatusType::Success,
856 )
857 .status_row(
858 ["Binary Input", &per_type.to_string(), "Active"],
859 StatusType::Success,
860 )
861 .status_row(
862 ["Binary Output", &per_type.to_string(), "Active"],
863 StatusType::Success,
864 );
865 table.print();
866 }
867 _ => {
868 #[derive(Serialize)]
869 struct BacnetServerInfo {
870 protocol: String,
871 bind_address: String,
872 device_instance: u32,
873 objects: usize,
874 bbmd_enabled: bool,
875 object_types: Vec<ObjectTypeInfo>,
876 status: String,
877 }
878 #[derive(Serialize)]
879 struct ObjectTypeInfo {
880 object_type: String,
881 count: usize,
882 status: String,
883 }
884 let info = BacnetServerInfo {
885 protocol: "BACnet/IP".into(),
886 bind_address: self.bind_addr.to_string(),
887 device_instance: self.device_instance,
888 objects: self.objects,
889 bbmd_enabled: self.bbmd_enabled,
890 object_types: vec![
891 ObjectTypeInfo {
892 object_type: "Device".into(),
893 count: 1,
894 status: "Online".into(),
895 },
896 ObjectTypeInfo {
897 object_type: "Analog Input".into(),
898 count: per_type,
899 status: "Active".into(),
900 },
901 ObjectTypeInfo {
902 object_type: "Analog Output".into(),
903 count: per_type,
904 status: "Active".into(),
905 },
906 ObjectTypeInfo {
907 object_type: "Binary Input".into(),
908 count: per_type,
909 status: "Active".into(),
910 },
911 ObjectTypeInfo {
912 object_type: "Binary Output".into(),
913 count: per_type,
914 status: "Active".into(),
915 },
916 ],
917 status: "Online".into(),
918 };
919 let _ = ctx.output().write(&info);
920 }
921 }
922 }
923
924 if !is_quiet {
925 ctx.output().info("Press Ctrl+C to stop");
926 }
927 ctx.shutdown_signal().notified().await;
928
929 self.stop_server(ctx).await?;
930 if !is_quiet {
931 ctx.output().success("BACnet simulator stopped");
932 }
933
934 Ok(CommandOutput::quiet_success())
935 }
936}
937
938#[async_trait]
939impl ProtocolCommand for BacnetCommand {
940 fn protocol(&self) -> Protocol {
941 Protocol::BacnetIp
942 }
943
944 fn default_port(&self) -> u16 {
945 47808
946 }
947
948 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
949 let output = ctx.output();
950 let spinner = output.spinner("Starting BACnet server...");
951
952 let config = BacnetServerConfig::new(self.device_instance)
953 .with_bind_addr(self.bind_addr)
954 .with_device_name("Mabinogion BACnet Simulator");
955
956 let registry = ObjectRegistry::new();
958
959 let descriptors = default_object_descriptors();
960 let objects_per_type = self.objects / descriptors.len();
961 registry.populate_standard_objects(&descriptors, objects_per_type);
962
963 let server = Arc::new(BACnetServer::new(config, registry));
964
965 {
966 let mut server_guard = self.server.lock().await;
967 *server_guard = Some(server.clone());
968 }
969
970 let server_clone = server.clone();
971 let task = tokio::spawn(async move {
972 if let Err(e) = server_clone.run().await {
973 tracing::error!("BACnet server error: {}", e);
974 }
975 });
976
977 {
978 let mut task_guard = self.server_task.lock().await;
979 *task_guard = Some(task);
980 }
981
982 tokio::time::sleep(Duration::from_millis(100)).await;
983
984 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
985 Ok(())
986 }
987
988 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
989 if let Some(server) = self.server.lock().await.as_ref() {
990 server.shutdown();
991 }
992
993 if let Some(task) = self.server_task.lock().await.take() {
994 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
995 }
996
997 Ok(())
998 }
999}
1000
1001pub struct KnxCommand {
1007 bind_addr: SocketAddr,
1008 individual_address: String,
1009 group_objects: usize,
1010 tags: Tags,
1012 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
1014 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
1016}
1017
1018impl KnxCommand {
1019 pub fn new() -> Self {
1020 Self {
1021 bind_addr: "0.0.0.0:3671".parse().unwrap(),
1022 individual_address: "1.1.1".into(),
1023 group_objects: 100,
1024 tags: Tags::new(),
1025 server: Arc::new(Mutex::new(None)),
1026 server_task: Arc::new(Mutex::new(None)),
1027 }
1028 }
1029
1030 pub fn with_port(mut self, port: u16) -> Self {
1031 self.bind_addr.set_port(port);
1032 self
1033 }
1034
1035 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
1036 self.individual_address = addr.into();
1037 self
1038 }
1039
1040 pub fn with_group_objects(mut self, count: usize) -> Self {
1041 self.group_objects = count;
1042 self
1043 }
1044
1045 pub fn with_tags(mut self, tags: Tags) -> Self {
1046 self.tags = tags;
1047 self
1048 }
1049}
1050
1051impl Default for KnxCommand {
1052 fn default() -> Self {
1053 Self::new()
1054 }
1055}
1056
1057#[async_trait]
1058impl Command for KnxCommand {
1059 fn name(&self) -> &str {
1060 "knx"
1061 }
1062
1063 fn description(&self) -> &str {
1064 "Start a KNXnet/IP simulator"
1065 }
1066
1067 fn requires_engine(&self) -> bool {
1068 true
1069 }
1070
1071 fn supports_shutdown(&self) -> bool {
1072 true
1073 }
1074
1075 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
1076 let format = ctx.output().format();
1077 let is_quiet = ctx.is_quiet();
1078 let is_verbose = ctx.is_verbose();
1079 let is_debug = ctx.is_debug();
1080
1081 if !is_quiet {
1082 if matches!(format, OutputFormat::Table) {
1083 let output = ctx.output();
1084 output.header("KNXnet/IP Simulator");
1085 output.kv("Bind Address", self.bind_addr);
1086 output.kv("Individual Address", &self.individual_address);
1087 output.kv("Group Objects", self.group_objects);
1088 }
1089 }
1090
1091 if is_verbose {
1093 ctx.vprintln(format!(" Max Connections: 10"));
1094 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
1095 }
1096
1097 if is_debug {
1099 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1100 ctx.dprintln(format!("Individual address: {}", self.individual_address));
1101 ctx.dprintln(format!("Group objects: {}", self.group_objects));
1102 }
1103
1104 self.start_server(ctx).await?;
1105
1106 if !is_quiet {
1107 match format {
1108 OutputFormat::Table => {
1109 let colors_enabled = ctx.colors_enabled();
1110 let table = TableBuilder::new(colors_enabled)
1111 .header(["Service", "Status"])
1112 .status_row(["Core", "Ready"], StatusType::Success)
1113 .status_row(["Device Management", "Ready"], StatusType::Success)
1114 .status_row(["Tunneling", "Ready"], StatusType::Success);
1115 table.print();
1116 }
1117 _ => {
1118 #[derive(Serialize)]
1119 struct KnxServerInfo {
1120 protocol: String,
1121 bind_address: String,
1122 individual_address: String,
1123 group_objects: usize,
1124 services: Vec<ServiceInfo>,
1125 status: String,
1126 }
1127 #[derive(Serialize)]
1128 struct ServiceInfo {
1129 service: String,
1130 status: String,
1131 }
1132 let info = KnxServerInfo {
1133 protocol: "KNXnet/IP".into(),
1134 bind_address: self.bind_addr.to_string(),
1135 individual_address: self.individual_address.clone(),
1136 group_objects: self.group_objects,
1137 services: vec![
1138 ServiceInfo {
1139 service: "Core".into(),
1140 status: "Ready".into(),
1141 },
1142 ServiceInfo {
1143 service: "Device Management".into(),
1144 status: "Ready".into(),
1145 },
1146 ServiceInfo {
1147 service: "Tunneling".into(),
1148 status: "Ready".into(),
1149 },
1150 ],
1151 status: "Online".into(),
1152 };
1153 let _ = ctx.output().write(&info);
1154 }
1155 }
1156 }
1157
1158 if !is_quiet {
1159 ctx.output().info("Press Ctrl+C to stop");
1160 }
1161 ctx.shutdown_signal().notified().await;
1162
1163 self.stop_server(ctx).await?;
1164 if !is_quiet {
1165 ctx.output().success("KNX simulator stopped");
1166 }
1167
1168 Ok(CommandOutput::quiet_success())
1169 }
1170}
1171
1172#[async_trait]
1173impl ProtocolCommand for KnxCommand {
1174 fn protocol(&self) -> Protocol {
1175 Protocol::KnxIp
1176 }
1177
1178 fn default_port(&self) -> u16 {
1179 3671
1180 }
1181
1182 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1183 let output = ctx.output();
1184 let spinner = output.spinner("Starting KNX server...");
1185
1186 let individual_address: IndividualAddress =
1188 self.individual_address.parse().map_err(|_| {
1189 crate::error::CliError::ExecutionFailed {
1190 message: format!("Invalid individual address: {}", self.individual_address),
1191 }
1192 })?;
1193
1194 let config = KnxServerConfig {
1195 bind_addr: self.bind_addr,
1196 individual_address,
1197 max_connections: 256,
1198 ..Default::default()
1199 };
1200
1201 let group_table = Arc::new(GroupObjectTable::new());
1203 let dpt_types = [
1204 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1213 let dpt_names = [
1214 "Switch",
1215 "Scaling",
1216 "Temperature",
1217 "Lux",
1218 "Humidity",
1219 "Counter",
1220 "SignedCounter",
1221 "Float",
1222 ];
1223
1224 for i in 0..self.group_objects {
1225 let main = ((i / 256) + 1) as u8;
1226 let middle = ((i / 8) % 8) as u8;
1227 let sub = (i % 256) as u8;
1228 let addr = GroupAddress::three_level(main, middle, sub);
1229 let dpt_idx = i % dpt_types.len();
1230 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1231 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1232 tracing::warn!("Failed to create group object {}: {}", i, e);
1233 }
1234 }
1235
1236 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1237
1238 {
1239 let mut server_guard = self.server.lock().await;
1240 *server_guard = Some(server.clone());
1241 }
1242
1243 let server_clone = server.clone();
1244 let task = tokio::spawn(async move {
1245 if let Err(e) = server_clone.start().await {
1246 tracing::error!("KNX server error: {}", e);
1247 }
1248 });
1249
1250 {
1251 let mut task_guard = self.server_task.lock().await;
1252 *task_guard = Some(task);
1253 }
1254
1255 tokio::time::sleep(Duration::from_millis(100)).await;
1256
1257 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1258 Ok(())
1259 }
1260
1261 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1262 let server_opt = self.server.lock().await.take();
1264 if let Some(server) = server_opt {
1265 let _ = tokio::task::spawn_blocking(move || {
1267 let rt = tokio::runtime::Handle::current();
1268 rt.block_on(async {
1269 let _ = server.stop().await;
1270 })
1271 })
1272 .await;
1273 }
1274
1275 if let Some(task) = self.server_task.lock().await.take() {
1276 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1277 }
1278
1279 Ok(())
1280 }
1281}