1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_bacnet::prelude::{
21 default_object_descriptors, BACnetServer, ObjectRegistry, ServerConfig as BacnetServerConfig,
22};
23use mabi_knx::{
24 DptId, GroupAddress, GroupObjectTable, IndividualAddress, KnxServer, KnxServerConfig,
25};
26use mabi_modbus::{tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig, ModbusTcpServerV2};
27use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
28
29async fn check_port_availability(addr: SocketAddr) {
34 use tokio::io::{AsyncReadExt, AsyncWriteExt};
35 use tokio::net::TcpStream;
36
37 let connect_result =
39 tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
40
41 match connect_result {
42 Ok(Ok(_first_stream)) => {
43 drop(_first_stream);
45
46 let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
47 let mut stream = TcpStream::connect(addr).await?;
48 let request: [u8; 12] = [
50 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
51 ];
52 stream.write_all(&request).await?;
53 let mut response = [0u8; 32];
54 let n = stream.read(&mut response).await?;
55 Ok::<_, std::io::Error>(n)
56 })
57 .await;
58
59 match probe_result {
60 Ok(Ok(n)) if n >= 7 => {
61 tracing::warn!(
62 port = addr.port(),
63 "Port {} is already in use by a responding Modbus server. \
64 The new server will fail to bind.",
65 addr.port()
66 );
67 }
68 _ => {
69 tracing::warn!(
71 port = addr.port(),
72 "Port {} is in use: TCP connects but no Modbus response. \
73 This may be a suspended (zombie) process holding the port.\n \
74 Diagnostic: lsof -i :{} | grep LISTEN\n \
75 To kill: kill $(lsof -ti :{} -sTCP:LISTEN)",
76 addr.port(),
77 addr.port(),
78 addr.port()
79 );
80 }
81 }
82 }
83 Ok(Err(_)) | Err(_) => {
84 tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
86 }
87 }
88}
89
90#[async_trait]
92pub trait ProtocolCommand: Command {
93 fn protocol(&self) -> Protocol;
95
96 fn default_port(&self) -> u16;
98
99 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
101
102 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
104}
105
106pub struct ModbusCommand {
112 bind_addr: SocketAddr,
114 devices: usize,
116 points_per_device: usize,
118 rtu_mode: bool,
120 serial_port: Option<String>,
122 tags: Tags,
124 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
126 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
128}
129
130impl ModbusCommand {
131 pub fn new() -> Self {
132 Self {
133 bind_addr: "0.0.0.0:502".parse().unwrap(),
134 devices: 1,
135 points_per_device: 100,
136 rtu_mode: false,
137 serial_port: None,
138 tags: Tags::new(),
139 server: Arc::new(Mutex::new(None)),
140 server_task: Arc::new(Mutex::new(None)),
141 }
142 }
143
144 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
145 self.bind_addr = addr;
146 self
147 }
148
149 pub fn with_port(mut self, port: u16) -> Self {
150 self.bind_addr.set_port(port);
151 self
152 }
153
154 pub fn with_devices(mut self, devices: usize) -> Self {
155 self.devices = devices;
156 self
157 }
158
159 pub fn with_points(mut self, points: usize) -> Self {
160 self.points_per_device = points;
161 self
162 }
163
164 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
165 self.rtu_mode = true;
166 self.serial_port = Some(serial_port.into());
167 self
168 }
169
170 pub fn with_tags(mut self, tags: Tags) -> Self {
171 self.tags = tags;
172 self
173 }
174}
175
176impl Default for ModbusCommand {
177 fn default() -> Self {
178 Self::new()
179 }
180}
181
182#[async_trait]
183impl Command for ModbusCommand {
184 fn name(&self) -> &str {
185 "modbus"
186 }
187
188 fn description(&self) -> &str {
189 "Start a Modbus TCP/RTU simulator"
190 }
191
192 fn requires_engine(&self) -> bool {
193 true
194 }
195
196 fn supports_shutdown(&self) -> bool {
197 true
198 }
199
200 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
201 let format = ctx.output().format();
202 let is_quiet = ctx.is_quiet();
203 let is_verbose = ctx.is_verbose();
204 let is_debug = ctx.is_debug();
205
206 if !is_quiet {
207 if matches!(format, OutputFormat::Table) {
208 let output = ctx.output();
209 if self.rtu_mode {
210 output.header("Modbus RTU Simulator");
211 output.kv("Serial Port", self.serial_port.as_deref().unwrap_or("N/A"));
212 } else {
213 output.header("Modbus TCP Simulator");
214 output.kv("Bind Address", self.bind_addr);
215 }
216 output.kv("Devices", self.devices);
217 output.kv("Points per Device", self.points_per_device);
218 output.kv("Total Points", self.devices * self.points_per_device);
219 }
220 }
221
222 if is_verbose {
224 ctx.vprintln(format!(
225 " Protocol Mode: {}",
226 if self.rtu_mode { "RTU" } else { "TCP" }
227 ));
228 ctx.vprintln(format!(
229 " Points Distribution: {} per register type",
230 self.points_per_device / 4
231 ));
232 }
233
234 if is_debug {
236 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
237 ctx.dprintln(format!(
238 "RTU mode: {}, Serial: {:?}",
239 self.rtu_mode, self.serial_port
240 ));
241 ctx.dprintln(format!(
242 "Devices: {}, Points/device: {}",
243 self.devices, self.points_per_device
244 ));
245 }
246
247 self.start_server(ctx).await?;
248
249 let points_per_type = self.points_per_device / 4;
250
251 if !is_quiet {
252 match format {
253 OutputFormat::Table => {
254 let colors_enabled = ctx.colors_enabled();
255 let builder = TableBuilder::new(colors_enabled).header([
256 "Unit ID",
257 "Holding Regs",
258 "Input Regs",
259 "Coils",
260 "Discrete",
261 "Status",
262 ]);
263
264 let devices = self.devices;
265 let pts = points_per_type.to_string();
266 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
267 let unit_id = (i + 1).to_string();
268 (
269 vec![
270 unit_id,
271 pts.clone(),
272 pts.clone(),
273 pts.clone(),
274 pts.clone(),
275 "Online".into(),
276 ],
277 StatusType::Success,
278 )
279 });
280 table.print();
281 }
282 _ => {
283 #[derive(Serialize)]
284 struct ModbusServerInfo {
285 protocol: String,
286 bind_address: String,
287 devices: usize,
288 points_per_device: usize,
289 total_points: usize,
290 rtu_mode: bool,
291 serial_port: Option<String>,
292 device_list: Vec<ModbusDeviceInfo>,
293 status: String,
294 }
295 #[derive(Serialize)]
296 struct ModbusDeviceInfo {
297 unit_id: usize,
298 holding_registers: usize,
299 input_registers: usize,
300 coils: usize,
301 discrete_inputs: usize,
302 status: String,
303 }
304 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
305 .map(|i| ModbusDeviceInfo {
306 unit_id: i + 1,
307 holding_registers: points_per_type,
308 input_registers: points_per_type,
309 coils: points_per_type,
310 discrete_inputs: points_per_type,
311 status: "Online".into(),
312 })
313 .collect();
314 let info = ModbusServerInfo {
315 protocol: if self.rtu_mode {
316 "Modbus RTU".into()
317 } else {
318 "Modbus TCP".into()
319 },
320 bind_address: self.bind_addr.to_string(),
321 devices: self.devices,
322 points_per_device: self.points_per_device,
323 total_points: self.devices * self.points_per_device,
324 rtu_mode: self.rtu_mode,
325 serial_port: self.serial_port.clone(),
326 device_list,
327 status: "Online".into(),
328 };
329 let _ = ctx.output().write(&info);
330 }
331 }
332 }
333
334 if !is_quiet {
335 ctx.output().info("Press Ctrl+C to stop");
336 }
337 ctx.shutdown_signal().notified().await;
338
339 self.stop_server(ctx).await?;
340 if !is_quiet {
341 ctx.output().success("Modbus simulator stopped");
342 }
343
344 Ok(CommandOutput::quiet_success())
345 }
346}
347
348#[async_trait]
349impl ProtocolCommand for ModbusCommand {
350 fn protocol(&self) -> Protocol {
351 if self.rtu_mode {
352 Protocol::ModbusRtu
353 } else {
354 Protocol::ModbusTcp
355 }
356 }
357
358 fn default_port(&self) -> u16 {
359 502
360 }
361
362 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363 let output = ctx.output();
364
365 check_port_availability(self.bind_addr).await;
367
368 let spinner = output.spinner("Starting Modbus server...");
369
370 let config = ServerConfigV2 {
371 bind_address: self.bind_addr,
372 ..Default::default()
373 };
374
375 let server = Arc::new(ModbusTcpServerV2::new(config));
376
377 for i in 0..self.devices {
378 let unit_id = (i + 1) as u8;
379 let points = (self.points_per_device / 4) as u16;
380 let device_config = ModbusDeviceConfig {
381 unit_id,
382 name: format!("Device-{}", unit_id),
383 holding_registers: points,
384 input_registers: points,
385 coils: points,
386 discrete_inputs: points,
387 response_delay_ms: 0,
388 tags: self.tags.clone(),
389 };
390 let device = ModbusDevice::new(device_config);
391 server.add_device(device);
392 }
393
394 {
395 let mut server_guard = self.server.lock().await;
396 *server_guard = Some(server.clone());
397 }
398
399 let server_clone = server.clone();
400 let task = tokio::spawn(async move {
401 if let Err(e) = server_clone.run().await {
402 tracing::error!("Modbus server error: {}", e);
403 }
404 });
405
406 {
407 let mut task_guard = self.server_task.lock().await;
408 *task_guard = Some(task);
409 }
410
411 tokio::time::sleep(Duration::from_millis(100)).await;
412
413 {
415 let task_guard = self.server_task.lock().await;
416 if let Some(task) = task_guard.as_ref() {
417 if task.is_finished() {
418 spinner.finish_with_message("Failed to start server");
419 return Err(crate::error::CliError::PortInUse {
420 port: self.bind_addr.port(),
421 });
422 }
423 }
424 }
425
426 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
427 Ok(())
428 }
429
430 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
431 if let Some(server) = self.server.lock().await.as_ref() {
432 server.shutdown();
433 }
434
435 if let Some(task) = self.server_task.lock().await.take() {
436 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
437 }
438
439 Ok(())
440 }
441}
442
443pub struct OpcuaCommand {
449 bind_addr: SocketAddr,
450 endpoint_path: String,
451 nodes: usize,
452 security_mode: String,
453 tags: Tags,
455 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
457 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
459}
460
461impl OpcuaCommand {
462 pub fn new() -> Self {
463 Self {
464 bind_addr: "0.0.0.0:4840".parse().unwrap(),
465 endpoint_path: "/".into(),
466 nodes: 1000,
467 security_mode: "None".into(),
468 tags: Tags::new(),
469 server: Arc::new(Mutex::new(None)),
470 server_task: Arc::new(Mutex::new(None)),
471 }
472 }
473
474 pub fn with_port(mut self, port: u16) -> Self {
475 self.bind_addr.set_port(port);
476 self
477 }
478
479 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
480 self.endpoint_path = path.into();
481 self
482 }
483
484 pub fn with_nodes(mut self, nodes: usize) -> Self {
485 self.nodes = nodes;
486 self
487 }
488
489 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
490 self.security_mode = mode.into();
491 self
492 }
493
494 pub fn with_tags(mut self, tags: Tags) -> Self {
495 self.tags = tags;
496 self
497 }
498}
499
500impl Default for OpcuaCommand {
501 fn default() -> Self {
502 Self::new()
503 }
504}
505
506#[async_trait]
507impl Command for OpcuaCommand {
508 fn name(&self) -> &str {
509 "opcua"
510 }
511
512 fn description(&self) -> &str {
513 "Start an OPC UA server simulator"
514 }
515
516 fn requires_engine(&self) -> bool {
517 true
518 }
519
520 fn supports_shutdown(&self) -> bool {
521 true
522 }
523
524 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
525 let format = ctx.output().format();
526 let is_quiet = ctx.is_quiet();
527 let is_verbose = ctx.is_verbose();
528 let is_debug = ctx.is_debug();
529
530 if !is_quiet {
531 if matches!(format, OutputFormat::Table) {
532 let output = ctx.output();
533 output.header("OPC UA Simulator");
534 output.kv(
535 "Endpoint",
536 format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
537 );
538 output.kv("Nodes", self.nodes);
539 output.kv("Security Mode", &self.security_mode);
540 }
541 }
542
543 if is_verbose {
545 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
546 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
547 ctx.vprintln(format!(" Max Subscriptions: 1000"));
548 ctx.vprintln(format!(" Max Monitored Items: 10000"));
549 }
550
551 if is_debug {
553 ctx.dprintln(format!(
554 "Full endpoint URL: opc.tcp://{}{}",
555 self.bind_addr, self.endpoint_path
556 ));
557 ctx.dprintln(format!("Node count: {}", self.nodes));
558 ctx.dprintln(format!("Security mode: {}", self.security_mode));
559 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
560 }
561
562 self.start_server(ctx).await?;
563
564 if !is_quiet {
565 match format {
566 OutputFormat::Table => {
567 let colors_enabled = ctx.colors_enabled();
568 let table = TableBuilder::new(colors_enabled)
569 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
570 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
571 .status_row(
572 ["1", &self.nodes.to_string(), "0", "Online"],
573 StatusType::Success,
574 );
575 table.print();
576 }
577 _ => {
578 #[derive(Serialize)]
579 struct OpcuaServerInfo {
580 protocol: String,
581 endpoint: String,
582 nodes: usize,
583 security_mode: String,
584 namespaces: Vec<NamespaceInfo>,
585 status: String,
586 }
587 #[derive(Serialize)]
588 struct NamespaceInfo {
589 index: u32,
590 nodes: String,
591 subscriptions: u32,
592 status: String,
593 }
594 let info = OpcuaServerInfo {
595 protocol: "OPC UA".into(),
596 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
597 nodes: self.nodes,
598 security_mode: self.security_mode.clone(),
599 namespaces: vec![
600 NamespaceInfo {
601 index: 0,
602 nodes: "Standard".into(),
603 subscriptions: 0,
604 status: "Ready".into(),
605 },
606 NamespaceInfo {
607 index: 1,
608 nodes: self.nodes.to_string(),
609 subscriptions: 0,
610 status: "Online".into(),
611 },
612 ],
613 status: "Online".into(),
614 };
615 let _ = ctx.output().write(&info);
616 }
617 }
618 }
619
620 if !is_quiet {
621 ctx.output().info("Press Ctrl+C to stop");
622 }
623 ctx.shutdown_signal().notified().await;
624
625 self.stop_server(ctx).await?;
626 if !is_quiet {
627 ctx.output().success("OPC UA simulator stopped");
628 }
629
630 Ok(CommandOutput::quiet_success())
631 }
632}
633
634#[async_trait]
635impl ProtocolCommand for OpcuaCommand {
636 fn protocol(&self) -> Protocol {
637 Protocol::OpcUa
638 }
639
640 fn default_port(&self) -> u16 {
641 4840
642 }
643
644 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
645 let output = ctx.output();
646 let spinner = output.spinner("Starting OPC UA server...");
647
648 let config = OpcUaServerConfig {
649 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
650 server_name: "Mabinogion OPC UA Simulator".to_string(),
651 max_subscriptions: 1000,
652 max_monitored_items: 10000,
653 ..Default::default()
654 };
655
656 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
657 crate::error::CliError::ExecutionFailed {
658 message: format!("Failed to create OPC UA server: {}", e),
659 }
660 })?);
661
662 let node_count = self.nodes.min(100);
665 for i in 0..node_count {
666 let node_id = format!("ns=2;i={}", 1000 + i);
667 let name = format!("Variable_{}", i);
668 let value = (i as f64) * 0.1;
669
670 if i % 2 == 0 {
671 let _ = server.add_writable_variable(node_id, name, value);
672 } else {
673 let _ = server.add_variable(node_id, name, value);
674 }
675 }
676
677 {
678 let mut server_guard = self.server.lock().await;
679 *server_guard = Some(server.clone());
680 }
681
682 let server_clone = server.clone();
683 let task = tokio::spawn(async move {
684 if let Err(e) = server_clone.start().await {
685 tracing::error!("OPC UA server error: {}", e);
686 }
687 });
688
689 {
690 let mut task_guard = self.server_task.lock().await;
691 *task_guard = Some(task);
692 }
693
694 tokio::time::sleep(Duration::from_millis(100)).await;
695
696 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
697 Ok(())
698 }
699
700 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
701 if let Some(server) = self.server.lock().await.as_ref() {
702 let _ = server.stop().await;
703 }
704
705 if let Some(task) = self.server_task.lock().await.take() {
706 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
707 }
708
709 Ok(())
710 }
711}
712
713pub struct BacnetCommand {
719 bind_addr: SocketAddr,
720 device_instance: u32,
721 objects: usize,
722 bbmd_enabled: bool,
723 tags: Tags,
725 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
727 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
729}
730
731impl BacnetCommand {
732 pub fn new() -> Self {
733 Self {
734 bind_addr: "0.0.0.0:47808".parse().unwrap(),
735 device_instance: 1234,
736 objects: 0,
737 bbmd_enabled: false,
738 tags: Tags::new(),
739 server: Arc::new(Mutex::new(None)),
740 server_task: Arc::new(Mutex::new(None)),
741 }
742 }
743
744 pub fn with_port(mut self, port: u16) -> Self {
745 self.bind_addr.set_port(port);
746 self
747 }
748
749 pub fn with_device_instance(mut self, instance: u32) -> Self {
750 self.device_instance = instance;
751 self
752 }
753
754 pub fn with_objects(mut self, objects: usize) -> Self {
755 self.objects = objects;
756 self
757 }
758
759 pub fn with_bbmd(mut self, enabled: bool) -> Self {
760 self.bbmd_enabled = enabled;
761 self
762 }
763
764 pub fn with_tags(mut self, tags: Tags) -> Self {
765 self.tags = tags;
766 self
767 }
768}
769
770impl Default for BacnetCommand {
771 fn default() -> Self {
772 Self::new()
773 }
774}
775
776#[async_trait]
777impl Command for BacnetCommand {
778 fn name(&self) -> &str {
779 "bacnet"
780 }
781
782 fn description(&self) -> &str {
783 "Start a BACnet/IP simulator"
784 }
785
786 fn requires_engine(&self) -> bool {
787 true
788 }
789
790 fn supports_shutdown(&self) -> bool {
791 true
792 }
793
794 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
795 let format = ctx.output().format();
796 let is_quiet = ctx.is_quiet();
797 let is_verbose = ctx.is_verbose();
798 let is_debug = ctx.is_debug();
799
800 if !is_quiet {
801 if matches!(format, OutputFormat::Table) {
802 let output = ctx.output();
803 output.header("BACnet/IP Simulator");
804 output.kv("Bind Address", self.bind_addr);
805 output.kv("Device Instance", self.device_instance);
806 output.kv("Objects", self.objects);
807 output.kv(
808 "BBMD",
809 if self.bbmd_enabled {
810 "Enabled"
811 } else {
812 "Disabled"
813 },
814 );
815 }
816 }
817
818 if is_verbose {
820 let per_type = if self.objects > 0 {
821 std::cmp::max(1, self.objects / 4)
822 } else {
823 0
824 };
825 if per_type == 0 {
826 ctx.vprintln(" Demo Objects: disabled (Device object only)");
827 } else {
828 ctx.vprintln(format!(
829 " Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})",
830 per_type, per_type, per_type, per_type, per_type
831 ));
832 }
833 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
834 }
835
836 if is_debug {
838 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
839 ctx.dprintln(format!("Device instance: {}", self.device_instance));
840 ctx.dprintln(format!(
841 "Total objects: {}, BBMD: {}",
842 self.objects, self.bbmd_enabled
843 ));
844 }
845
846 self.start_server(ctx).await?;
847
848 let per_type = if self.objects > 0 {
849 std::cmp::max(1, self.objects / 4)
850 } else {
851 0
852 };
853
854 if !is_quiet {
855 match format {
856 OutputFormat::Table => {
857 let colors_enabled = ctx.colors_enabled();
858 let demo_status = if per_type == 0 {
859 "Not created"
860 } else {
861 "Active"
862 };
863 let table = TableBuilder::new(colors_enabled)
864 .header(["Object Type", "Count", "Status"])
865 .status_row(["Device", "1", "Online"], StatusType::Success)
866 .status_row(
867 ["Analog Input", &per_type.to_string(), demo_status],
868 StatusType::Success,
869 )
870 .status_row(
871 ["Analog Output", &per_type.to_string(), demo_status],
872 StatusType::Success,
873 )
874 .status_row(
875 ["Binary Input", &per_type.to_string(), demo_status],
876 StatusType::Success,
877 )
878 .status_row(
879 ["Binary Output", &per_type.to_string(), demo_status],
880 StatusType::Success,
881 );
882 table.print();
883 }
884 _ => {
885 #[derive(Serialize)]
886 struct BacnetServerInfo {
887 protocol: String,
888 bind_address: String,
889 device_instance: u32,
890 objects: usize,
891 bbmd_enabled: bool,
892 object_types: Vec<ObjectTypeInfo>,
893 status: String,
894 }
895 #[derive(Serialize)]
896 struct ObjectTypeInfo {
897 object_type: String,
898 count: usize,
899 status: String,
900 }
901 let info = BacnetServerInfo {
902 protocol: "BACnet/IP".into(),
903 bind_address: self.bind_addr.to_string(),
904 device_instance: self.device_instance,
905 objects: self.objects,
906 bbmd_enabled: self.bbmd_enabled,
907 object_types: vec![
908 ObjectTypeInfo {
909 object_type: "Device".into(),
910 count: 1,
911 status: "Online".into(),
912 },
913 ObjectTypeInfo {
914 object_type: "Analog Input".into(),
915 count: per_type,
916 status: if per_type == 0 {
917 "Not created".into()
918 } else {
919 "Active".into()
920 },
921 },
922 ObjectTypeInfo {
923 object_type: "Analog Output".into(),
924 count: per_type,
925 status: if per_type == 0 {
926 "Not created".into()
927 } else {
928 "Active".into()
929 },
930 },
931 ObjectTypeInfo {
932 object_type: "Binary Input".into(),
933 count: per_type,
934 status: if per_type == 0 {
935 "Not created".into()
936 } else {
937 "Active".into()
938 },
939 },
940 ObjectTypeInfo {
941 object_type: "Binary Output".into(),
942 count: per_type,
943 status: if per_type == 0 {
944 "Not created".into()
945 } else {
946 "Active".into()
947 },
948 },
949 ],
950 status: "Online".into(),
951 };
952 let _ = ctx.output().write(&info);
953 }
954 }
955 }
956
957 if !is_quiet {
958 ctx.output().info("Press Ctrl+C to stop");
959 }
960 ctx.shutdown_signal().notified().await;
961
962 self.stop_server(ctx).await?;
963 if !is_quiet {
964 ctx.output().success("BACnet simulator stopped");
965 }
966
967 Ok(CommandOutput::quiet_success())
968 }
969}
970
971#[async_trait]
972impl ProtocolCommand for BacnetCommand {
973 fn protocol(&self) -> Protocol {
974 Protocol::BacnetIp
975 }
976
977 fn default_port(&self) -> u16 {
978 47808
979 }
980
981 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
982 let output = ctx.output();
983 let spinner = output.spinner("Starting BACnet server...");
984
985 let config = BacnetServerConfig::new(self.device_instance)
986 .with_bind_addr(self.bind_addr)
987 .with_device_name("Mabinogion BACnet Simulator");
988
989 let registry = ObjectRegistry::new();
990
991 if self.objects > 0 {
992 let descriptors = default_object_descriptors();
993 let objects_per_type = std::cmp::max(1, self.objects / descriptors.len());
994 registry.populate_standard_objects(&descriptors, objects_per_type);
995 }
996
997 let server = Arc::new(BACnetServer::new(config, registry));
998
999 {
1000 let mut server_guard = self.server.lock().await;
1001 *server_guard = Some(server.clone());
1002 }
1003
1004 let server_clone = server.clone();
1005 let task = tokio::spawn(async move {
1006 if let Err(e) = server_clone.run().await {
1007 tracing::error!("BACnet server error: {}", e);
1008 }
1009 });
1010
1011 {
1012 let mut task_guard = self.server_task.lock().await;
1013 *task_guard = Some(task);
1014 }
1015
1016 tokio::time::sleep(Duration::from_millis(100)).await;
1017
1018 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
1019 Ok(())
1020 }
1021
1022 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1023 if let Some(server) = self.server.lock().await.as_ref() {
1024 server.shutdown();
1025 }
1026
1027 if let Some(task) = self.server_task.lock().await.take() {
1028 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1029 }
1030
1031 Ok(())
1032 }
1033}
1034
1035pub struct KnxCommand {
1041 bind_addr: SocketAddr,
1042 individual_address: String,
1043 group_objects: usize,
1044 tags: Tags,
1046 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
1048 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
1050}
1051
1052impl KnxCommand {
1053 pub fn new() -> Self {
1054 Self {
1055 bind_addr: "0.0.0.0:3671".parse().unwrap(),
1056 individual_address: "1.1.1".into(),
1057 group_objects: 100,
1058 tags: Tags::new(),
1059 server: Arc::new(Mutex::new(None)),
1060 server_task: Arc::new(Mutex::new(None)),
1061 }
1062 }
1063
1064 pub fn with_port(mut self, port: u16) -> Self {
1065 self.bind_addr.set_port(port);
1066 self
1067 }
1068
1069 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
1070 self.individual_address = addr.into();
1071 self
1072 }
1073
1074 pub fn with_group_objects(mut self, count: usize) -> Self {
1075 self.group_objects = count;
1076 self
1077 }
1078
1079 pub fn with_tags(mut self, tags: Tags) -> Self {
1080 self.tags = tags;
1081 self
1082 }
1083}
1084
1085impl Default for KnxCommand {
1086 fn default() -> Self {
1087 Self::new()
1088 }
1089}
1090
1091#[async_trait]
1092impl Command for KnxCommand {
1093 fn name(&self) -> &str {
1094 "knx"
1095 }
1096
1097 fn description(&self) -> &str {
1098 "Start a KNXnet/IP simulator"
1099 }
1100
1101 fn requires_engine(&self) -> bool {
1102 true
1103 }
1104
1105 fn supports_shutdown(&self) -> bool {
1106 true
1107 }
1108
1109 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
1110 let format = ctx.output().format();
1111 let is_quiet = ctx.is_quiet();
1112 let is_verbose = ctx.is_verbose();
1113 let is_debug = ctx.is_debug();
1114
1115 if !is_quiet {
1116 if matches!(format, OutputFormat::Table) {
1117 let output = ctx.output();
1118 output.header("KNXnet/IP Simulator");
1119 output.kv("Bind Address", self.bind_addr);
1120 output.kv("Individual Address", &self.individual_address);
1121 output.kv("Group Objects", self.group_objects);
1122 }
1123 }
1124
1125 if is_verbose {
1127 ctx.vprintln(format!(" Max Connections: 10"));
1128 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
1129 }
1130
1131 if is_debug {
1133 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1134 ctx.dprintln(format!("Individual address: {}", self.individual_address));
1135 ctx.dprintln(format!("Group objects: {}", self.group_objects));
1136 }
1137
1138 self.start_server(ctx).await?;
1139
1140 if !is_quiet {
1141 match format {
1142 OutputFormat::Table => {
1143 let colors_enabled = ctx.colors_enabled();
1144 let table = TableBuilder::new(colors_enabled)
1145 .header(["Service", "Status"])
1146 .status_row(["Core", "Ready"], StatusType::Success)
1147 .status_row(["Device Management", "Ready"], StatusType::Success)
1148 .status_row(["Tunneling", "Ready"], StatusType::Success);
1149 table.print();
1150 }
1151 _ => {
1152 #[derive(Serialize)]
1153 struct KnxServerInfo {
1154 protocol: String,
1155 bind_address: String,
1156 individual_address: String,
1157 group_objects: usize,
1158 services: Vec<ServiceInfo>,
1159 status: String,
1160 }
1161 #[derive(Serialize)]
1162 struct ServiceInfo {
1163 service: String,
1164 status: String,
1165 }
1166 let info = KnxServerInfo {
1167 protocol: "KNXnet/IP".into(),
1168 bind_address: self.bind_addr.to_string(),
1169 individual_address: self.individual_address.clone(),
1170 group_objects: self.group_objects,
1171 services: vec![
1172 ServiceInfo {
1173 service: "Core".into(),
1174 status: "Ready".into(),
1175 },
1176 ServiceInfo {
1177 service: "Device Management".into(),
1178 status: "Ready".into(),
1179 },
1180 ServiceInfo {
1181 service: "Tunneling".into(),
1182 status: "Ready".into(),
1183 },
1184 ],
1185 status: "Online".into(),
1186 };
1187 let _ = ctx.output().write(&info);
1188 }
1189 }
1190 }
1191
1192 if !is_quiet {
1193 ctx.output().info("Press Ctrl+C to stop");
1194 }
1195 ctx.shutdown_signal().notified().await;
1196
1197 self.stop_server(ctx).await?;
1198 if !is_quiet {
1199 ctx.output().success("KNX simulator stopped");
1200 }
1201
1202 Ok(CommandOutput::quiet_success())
1203 }
1204}
1205
1206#[async_trait]
1207impl ProtocolCommand for KnxCommand {
1208 fn protocol(&self) -> Protocol {
1209 Protocol::KnxIp
1210 }
1211
1212 fn default_port(&self) -> u16 {
1213 3671
1214 }
1215
1216 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1217 let output = ctx.output();
1218 let spinner = output.spinner("Starting KNX server...");
1219
1220 let individual_address: IndividualAddress =
1222 self.individual_address.parse().map_err(|_| {
1223 crate::error::CliError::ExecutionFailed {
1224 message: format!("Invalid individual address: {}", self.individual_address),
1225 }
1226 })?;
1227
1228 let config = KnxServerConfig {
1229 bind_addr: self.bind_addr,
1230 individual_address,
1231 max_connections: 256,
1232 ..Default::default()
1233 };
1234
1235 let group_table = Arc::new(GroupObjectTable::new());
1237 let dpt_types = [
1238 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1247 let dpt_names = [
1248 "Switch",
1249 "Scaling",
1250 "Temperature",
1251 "Lux",
1252 "Humidity",
1253 "Counter",
1254 "SignedCounter",
1255 "Float",
1256 ];
1257
1258 for i in 0..self.group_objects {
1259 let main = ((i / 256) + 1) as u8;
1260 let middle = ((i / 8) % 8) as u8;
1261 let sub = (i % 256) as u8;
1262 let addr = GroupAddress::three_level(main, middle, sub);
1263 let dpt_idx = i % dpt_types.len();
1264 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1265 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1266 tracing::warn!("Failed to create group object {}: {}", i, e);
1267 }
1268 }
1269
1270 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1271
1272 {
1273 let mut server_guard = self.server.lock().await;
1274 *server_guard = Some(server.clone());
1275 }
1276
1277 let server_clone = server.clone();
1278 let task = tokio::spawn(async move {
1279 if let Err(e) = server_clone.start().await {
1280 tracing::error!("KNX server error: {}", e);
1281 }
1282 });
1283
1284 {
1285 let mut task_guard = self.server_task.lock().await;
1286 *task_guard = Some(task);
1287 }
1288
1289 tokio::time::sleep(Duration::from_millis(100)).await;
1290
1291 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1292 Ok(())
1293 }
1294
1295 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1296 let server_opt = self.server.lock().await.take();
1298 if let Some(server) = server_opt {
1299 let _ = tokio::task::spawn_blocking(move || {
1301 let rt = tokio::runtime::Handle::current();
1302 rt.block_on(async {
1303 let _ = server.stop().await;
1304 })
1305 })
1306 .await;
1307 }
1308
1309 if let Some(task) = self.server_task.lock().await.take() {
1310 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1311 }
1312
1313 Ok(())
1314 }
1315}