1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{OutputFormat, PaginatedTable, StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use mabi_core::tags::Tags;
12use serde::Serialize;
13use std::net::SocketAddr;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18
19use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
21use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
22use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, default_object_descriptors};
23use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress, GroupAddress, DptId, GroupObjectTable};
24
25async fn check_port_availability(addr: SocketAddr) {
30 use tokio::io::{AsyncReadExt, AsyncWriteExt};
31 use tokio::net::TcpStream;
32
33 let connect_result =
35 tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await;
36
37 match connect_result {
38 Ok(Ok(_first_stream)) => {
39 drop(_first_stream);
41
42 let probe_result = tokio::time::timeout(Duration::from_secs(1), async {
43 let mut stream = TcpStream::connect(addr).await?;
44 let request: [u8; 12] = [
46 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x01, 0x03, 0x00, 0x00, 0x00, 0x01,
47 ];
48 stream.write_all(&request).await?;
49 let mut response = [0u8; 32];
50 let n = stream.read(&mut response).await?;
51 Ok::<_, std::io::Error>(n)
52 })
53 .await;
54
55 match probe_result {
56 Ok(Ok(n)) if n >= 7 => {
57 tracing::warn!(
58 port = addr.port(),
59 "Port {} is already in use by a responding Modbus server. \
60 The new server will fail to bind.",
61 addr.port()
62 );
63 }
64 _ => {
65 tracing::warn!(
67 port = addr.port(),
68 "Port {} is in use: TCP connects but no Modbus response. \
69 This may be a suspended (zombie) process holding the port.\n \
70 Diagnostic: lsof -i :{} | grep LISTEN\n \
71 To kill: kill $(lsof -ti :{} -sTCP:LISTEN)",
72 addr.port(),
73 addr.port(),
74 addr.port()
75 );
76 }
77 }
78 }
79 Ok(Err(_)) | Err(_) => {
80 tracing::debug!(port = addr.port(), "Port {} is available", addr.port());
82 }
83 }
84}
85
86#[async_trait]
88pub trait ProtocolCommand: Command {
89 fn protocol(&self) -> Protocol;
91
92 fn default_port(&self) -> u16;
94
95 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
97
98 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
100}
101
102pub struct ModbusCommand {
108 bind_addr: SocketAddr,
110 devices: usize,
112 points_per_device: usize,
114 rtu_mode: bool,
116 serial_port: Option<String>,
118 tags: Tags,
120 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
122 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
124}
125
126impl ModbusCommand {
127 pub fn new() -> Self {
128 Self {
129 bind_addr: "0.0.0.0:502".parse().unwrap(),
130 devices: 1,
131 points_per_device: 100,
132 rtu_mode: false,
133 serial_port: None,
134 tags: Tags::new(),
135 server: Arc::new(Mutex::new(None)),
136 server_task: Arc::new(Mutex::new(None)),
137 }
138 }
139
140 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
141 self.bind_addr = addr;
142 self
143 }
144
145 pub fn with_port(mut self, port: u16) -> Self {
146 self.bind_addr.set_port(port);
147 self
148 }
149
150 pub fn with_devices(mut self, devices: usize) -> Self {
151 self.devices = devices;
152 self
153 }
154
155 pub fn with_points(mut self, points: usize) -> Self {
156 self.points_per_device = points;
157 self
158 }
159
160 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
161 self.rtu_mode = true;
162 self.serial_port = Some(serial_port.into());
163 self
164 }
165
166 pub fn with_tags(mut self, tags: Tags) -> Self {
167 self.tags = tags;
168 self
169 }
170}
171
172impl Default for ModbusCommand {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178#[async_trait]
179impl Command for ModbusCommand {
180 fn name(&self) -> &str {
181 "modbus"
182 }
183
184 fn description(&self) -> &str {
185 "Start a Modbus TCP/RTU simulator"
186 }
187
188 fn requires_engine(&self) -> bool {
189 true
190 }
191
192 fn supports_shutdown(&self) -> bool {
193 true
194 }
195
196 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
197 let format = ctx.output().format();
198 let is_quiet = ctx.is_quiet();
199 let is_verbose = ctx.is_verbose();
200 let is_debug = ctx.is_debug();
201
202 if !is_quiet {
203 if matches!(format, OutputFormat::Table) {
204 let output = ctx.output();
205 if self.rtu_mode {
206 output.header("Modbus RTU Simulator");
207 output.kv(
208 "Serial Port",
209 self.serial_port.as_deref().unwrap_or("N/A"),
210 );
211 } else {
212 output.header("Modbus TCP Simulator");
213 output.kv("Bind Address", self.bind_addr);
214 }
215 output.kv("Devices", self.devices);
216 output.kv("Points per Device", self.points_per_device);
217 output.kv("Total Points", self.devices * self.points_per_device);
218 }
219 }
220
221 if is_verbose {
223 ctx.vprintln(format!(" Protocol Mode: {}", if self.rtu_mode { "RTU" } else { "TCP" }));
224 ctx.vprintln(format!(" Points Distribution: {} per register type", self.points_per_device / 4));
225 }
226
227 if is_debug {
229 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
230 ctx.dprintln(format!("RTU mode: {}, Serial: {:?}", self.rtu_mode, self.serial_port));
231 ctx.dprintln(format!("Devices: {}, Points/device: {}", self.devices, self.points_per_device));
232 }
233
234 self.start_server(ctx).await?;
235
236 let points_per_type = self.points_per_device / 4;
237
238 if !is_quiet {
239 match format {
240 OutputFormat::Table => {
241 let colors_enabled = ctx.colors_enabled();
242 let builder = TableBuilder::new(colors_enabled)
243 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"]);
244
245 let devices = self.devices;
246 let pts = points_per_type.to_string();
247 let table = PaginatedTable::default().render(builder, devices, 6, |i| {
248 let unit_id = (i + 1).to_string();
249 (
250 vec![unit_id, pts.clone(), pts.clone(), pts.clone(), pts.clone(), "Online".into()],
251 StatusType::Success,
252 )
253 });
254 table.print();
255 }
256 _ => {
257 #[derive(Serialize)]
258 struct ModbusServerInfo {
259 protocol: String,
260 bind_address: String,
261 devices: usize,
262 points_per_device: usize,
263 total_points: usize,
264 rtu_mode: bool,
265 serial_port: Option<String>,
266 device_list: Vec<ModbusDeviceInfo>,
267 status: String,
268 }
269 #[derive(Serialize)]
270 struct ModbusDeviceInfo {
271 unit_id: usize,
272 holding_registers: usize,
273 input_registers: usize,
274 coils: usize,
275 discrete_inputs: usize,
276 status: String,
277 }
278 let device_list: Vec<ModbusDeviceInfo> = (0..self.devices)
279 .map(|i| ModbusDeviceInfo {
280 unit_id: i + 1,
281 holding_registers: points_per_type,
282 input_registers: points_per_type,
283 coils: points_per_type,
284 discrete_inputs: points_per_type,
285 status: "Online".into(),
286 })
287 .collect();
288 let info = ModbusServerInfo {
289 protocol: if self.rtu_mode { "Modbus RTU".into() } else { "Modbus TCP".into() },
290 bind_address: self.bind_addr.to_string(),
291 devices: self.devices,
292 points_per_device: self.points_per_device,
293 total_points: self.devices * self.points_per_device,
294 rtu_mode: self.rtu_mode,
295 serial_port: self.serial_port.clone(),
296 device_list,
297 status: "Online".into(),
298 };
299 let _ = ctx.output().write(&info);
300 }
301 }
302 }
303
304 if !is_quiet {
305 ctx.output().info("Press Ctrl+C to stop");
306 }
307 ctx.shutdown_signal().notified().await;
308
309 self.stop_server(ctx).await?;
310 if !is_quiet {
311 ctx.output().success("Modbus simulator stopped");
312 }
313
314 Ok(CommandOutput::quiet_success())
315 }
316}
317
318#[async_trait]
319impl ProtocolCommand for ModbusCommand {
320 fn protocol(&self) -> Protocol {
321 if self.rtu_mode {
322 Protocol::ModbusRtu
323 } else {
324 Protocol::ModbusTcp
325 }
326 }
327
328 fn default_port(&self) -> u16 {
329 502
330 }
331
332 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
333 let output = ctx.output();
334
335 check_port_availability(self.bind_addr).await;
337
338 let spinner = output.spinner("Starting Modbus server...");
339
340 let config = ServerConfigV2 {
341 bind_address: self.bind_addr,
342 ..Default::default()
343 };
344
345 let server = Arc::new(ModbusTcpServerV2::new(config));
346
347 for i in 0..self.devices {
348 let unit_id = (i + 1) as u8;
349 let points = (self.points_per_device / 4) as u16;
350 let device_config = ModbusDeviceConfig {
351 unit_id,
352 name: format!("Device-{}", unit_id),
353 holding_registers: points,
354 input_registers: points,
355 coils: points,
356 discrete_inputs: points,
357 response_delay_ms: 0,
358 tags: self.tags.clone(),
359 };
360 let device = ModbusDevice::new(device_config);
361 server.add_device(device);
362 }
363
364 {
365 let mut server_guard = self.server.lock().await;
366 *server_guard = Some(server.clone());
367 }
368
369 let server_clone = server.clone();
370 let task = tokio::spawn(async move {
371 if let Err(e) = server_clone.run().await {
372 tracing::error!("Modbus server error: {}", e);
373 }
374 });
375
376 {
377 let mut task_guard = self.server_task.lock().await;
378 *task_guard = Some(task);
379 }
380
381 tokio::time::sleep(Duration::from_millis(100)).await;
382
383 {
385 let task_guard = self.server_task.lock().await;
386 if let Some(task) = task_guard.as_ref() {
387 if task.is_finished() {
388 spinner.finish_with_message("Failed to start server");
389 return Err(crate::error::CliError::PortInUse {
390 port: self.bind_addr.port(),
391 });
392 }
393 }
394 }
395
396 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
397 Ok(())
398 }
399
400 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
401 if let Some(server) = self.server.lock().await.as_ref() {
402 server.shutdown();
403 }
404
405 if let Some(task) = self.server_task.lock().await.take() {
406 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
407 }
408
409 Ok(())
410 }
411}
412
413pub struct OpcuaCommand {
419 bind_addr: SocketAddr,
420 endpoint_path: String,
421 nodes: usize,
422 security_mode: String,
423 tags: Tags,
425 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
427 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
429}
430
431impl OpcuaCommand {
432 pub fn new() -> Self {
433 Self {
434 bind_addr: "0.0.0.0:4840".parse().unwrap(),
435 endpoint_path: "/".into(),
436 nodes: 1000,
437 security_mode: "None".into(),
438 tags: Tags::new(),
439 server: Arc::new(Mutex::new(None)),
440 server_task: Arc::new(Mutex::new(None)),
441 }
442 }
443
444 pub fn with_port(mut self, port: u16) -> Self {
445 self.bind_addr.set_port(port);
446 self
447 }
448
449 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
450 self.endpoint_path = path.into();
451 self
452 }
453
454 pub fn with_nodes(mut self, nodes: usize) -> Self {
455 self.nodes = nodes;
456 self
457 }
458
459 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
460 self.security_mode = mode.into();
461 self
462 }
463
464 pub fn with_tags(mut self, tags: Tags) -> Self {
465 self.tags = tags;
466 self
467 }
468}
469
470impl Default for OpcuaCommand {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476#[async_trait]
477impl Command for OpcuaCommand {
478 fn name(&self) -> &str {
479 "opcua"
480 }
481
482 fn description(&self) -> &str {
483 "Start an OPC UA server simulator"
484 }
485
486 fn requires_engine(&self) -> bool {
487 true
488 }
489
490 fn supports_shutdown(&self) -> bool {
491 true
492 }
493
494 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
495 let format = ctx.output().format();
496 let is_quiet = ctx.is_quiet();
497 let is_verbose = ctx.is_verbose();
498 let is_debug = ctx.is_debug();
499
500 if !is_quiet {
501 if matches!(format, OutputFormat::Table) {
502 let output = ctx.output();
503 output.header("OPC UA Simulator");
504 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
505 output.kv("Nodes", self.nodes);
506 output.kv("Security Mode", &self.security_mode);
507 }
508 }
509
510 if is_verbose {
512 ctx.vprintln(format!(" Bind Address: {}", self.bind_addr));
513 ctx.vprintln(format!(" Endpoint Path: {}", self.endpoint_path));
514 ctx.vprintln(format!(" Max Subscriptions: 1000"));
515 ctx.vprintln(format!(" Max Monitored Items: 10000"));
516 }
517
518 if is_debug {
520 ctx.dprintln(format!("Full endpoint URL: opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
521 ctx.dprintln(format!("Node count: {}", self.nodes));
522 ctx.dprintln(format!("Security mode: {}", self.security_mode));
523 ctx.dprintln(format!("Sample nodes created: {}", self.nodes.min(100)));
524 }
525
526 self.start_server(ctx).await?;
527
528 if !is_quiet {
529 match format {
530 OutputFormat::Table => {
531 let colors_enabled = ctx.colors_enabled();
532 let table = TableBuilder::new(colors_enabled)
533 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
534 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
535 .status_row(
536 ["1", &self.nodes.to_string(), "0", "Online"],
537 StatusType::Success,
538 );
539 table.print();
540 }
541 _ => {
542 #[derive(Serialize)]
543 struct OpcuaServerInfo {
544 protocol: String,
545 endpoint: String,
546 nodes: usize,
547 security_mode: String,
548 namespaces: Vec<NamespaceInfo>,
549 status: String,
550 }
551 #[derive(Serialize)]
552 struct NamespaceInfo {
553 index: u32,
554 nodes: String,
555 subscriptions: u32,
556 status: String,
557 }
558 let info = OpcuaServerInfo {
559 protocol: "OPC UA".into(),
560 endpoint: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
561 nodes: self.nodes,
562 security_mode: self.security_mode.clone(),
563 namespaces: vec![
564 NamespaceInfo { index: 0, nodes: "Standard".into(), subscriptions: 0, status: "Ready".into() },
565 NamespaceInfo { index: 1, nodes: self.nodes.to_string(), subscriptions: 0, status: "Online".into() },
566 ],
567 status: "Online".into(),
568 };
569 let _ = ctx.output().write(&info);
570 }
571 }
572 }
573
574 if !is_quiet {
575 ctx.output().info("Press Ctrl+C to stop");
576 }
577 ctx.shutdown_signal().notified().await;
578
579 self.stop_server(ctx).await?;
580 if !is_quiet {
581 ctx.output().success("OPC UA simulator stopped");
582 }
583
584 Ok(CommandOutput::quiet_success())
585 }
586}
587
588#[async_trait]
589impl ProtocolCommand for OpcuaCommand {
590 fn protocol(&self) -> Protocol {
591 Protocol::OpcUa
592 }
593
594 fn default_port(&self) -> u16 {
595 4840
596 }
597
598 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
599 let output = ctx.output();
600 let spinner = output.spinner("Starting OPC UA server...");
601
602 let config = OpcUaServerConfig {
603 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
604 server_name: "Mabinogion OPC UA Simulator".to_string(),
605 max_subscriptions: 1000,
606 max_monitored_items: 10000,
607 ..Default::default()
608 };
609
610 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
611 crate::error::CliError::ExecutionFailed {
612 message: format!("Failed to create OPC UA server: {}", e)
613 }
614 })?);
615
616 let node_count = self.nodes.min(100);
619 for i in 0..node_count {
620 let node_id = format!("ns=2;i={}", 1000 + i);
621 let name = format!("Variable_{}", i);
622 let value = (i as f64) * 0.1;
623
624 if i % 2 == 0 {
625 let _ = server.add_writable_variable(node_id, name, value);
626 } else {
627 let _ = server.add_variable(node_id, name, value);
628 }
629 }
630
631 {
632 let mut server_guard = self.server.lock().await;
633 *server_guard = Some(server.clone());
634 }
635
636 let server_clone = server.clone();
637 let task = tokio::spawn(async move {
638 if let Err(e) = server_clone.start().await {
639 tracing::error!("OPC UA server error: {}", e);
640 }
641 });
642
643 {
644 let mut task_guard = self.server_task.lock().await;
645 *task_guard = Some(task);
646 }
647
648 tokio::time::sleep(Duration::from_millis(100)).await;
649
650 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
651 Ok(())
652 }
653
654 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
655 if let Some(server) = self.server.lock().await.as_ref() {
656 let _ = server.stop().await;
657 }
658
659 if let Some(task) = self.server_task.lock().await.take() {
660 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
661 }
662
663 Ok(())
664 }
665}
666
667pub struct BacnetCommand {
673 bind_addr: SocketAddr,
674 device_instance: u32,
675 objects: usize,
676 bbmd_enabled: bool,
677 tags: Tags,
679 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
681 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
683}
684
685impl BacnetCommand {
686 pub fn new() -> Self {
687 Self {
688 bind_addr: "0.0.0.0:47808".parse().unwrap(),
689 device_instance: 1234,
690 objects: 100,
691 bbmd_enabled: false,
692 tags: Tags::new(),
693 server: Arc::new(Mutex::new(None)),
694 server_task: Arc::new(Mutex::new(None)),
695 }
696 }
697
698 pub fn with_port(mut self, port: u16) -> Self {
699 self.bind_addr.set_port(port);
700 self
701 }
702
703 pub fn with_device_instance(mut self, instance: u32) -> Self {
704 self.device_instance = instance;
705 self
706 }
707
708 pub fn with_objects(mut self, objects: usize) -> Self {
709 self.objects = objects;
710 self
711 }
712
713 pub fn with_bbmd(mut self, enabled: bool) -> Self {
714 self.bbmd_enabled = enabled;
715 self
716 }
717
718 pub fn with_tags(mut self, tags: Tags) -> Self {
719 self.tags = tags;
720 self
721 }
722}
723
724impl Default for BacnetCommand {
725 fn default() -> Self {
726 Self::new()
727 }
728}
729
730#[async_trait]
731impl Command for BacnetCommand {
732 fn name(&self) -> &str {
733 "bacnet"
734 }
735
736 fn description(&self) -> &str {
737 "Start a BACnet/IP simulator"
738 }
739
740 fn requires_engine(&self) -> bool {
741 true
742 }
743
744 fn supports_shutdown(&self) -> bool {
745 true
746 }
747
748 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
749 let format = ctx.output().format();
750 let is_quiet = ctx.is_quiet();
751 let is_verbose = ctx.is_verbose();
752 let is_debug = ctx.is_debug();
753
754 if !is_quiet {
755 if matches!(format, OutputFormat::Table) {
756 let output = ctx.output();
757 output.header("BACnet/IP Simulator");
758 output.kv("Bind Address", self.bind_addr);
759 output.kv("Device Instance", self.device_instance);
760 output.kv("Objects", self.objects);
761 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
762 }
763 }
764
765 if is_verbose {
767 let per_type = self.objects / 4;
768 ctx.vprintln(format!(" Objects per Type: {} (AI: {}, AO: {}, BI: {}, BO: {})", per_type, per_type, per_type, per_type, per_type));
769 ctx.vprintln(format!(" Device Name: Mabinogion BACnet Simulator"));
770 }
771
772 if is_debug {
774 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
775 ctx.dprintln(format!("Device instance: {}", self.device_instance));
776 ctx.dprintln(format!("Total objects: {}, BBMD: {}", self.objects, self.bbmd_enabled));
777 }
778
779 self.start_server(ctx).await?;
780
781 let per_type = self.objects / 4;
782
783 if !is_quiet {
784 match format {
785 OutputFormat::Table => {
786 let colors_enabled = ctx.colors_enabled();
787 let table = TableBuilder::new(colors_enabled)
788 .header(["Object Type", "Count", "Status"])
789 .status_row(["Device", "1", "Online"], StatusType::Success)
790 .status_row(["Analog Input", &per_type.to_string(), "Active"], StatusType::Success)
791 .status_row(["Analog Output", &per_type.to_string(), "Active"], StatusType::Success)
792 .status_row(["Binary Input", &per_type.to_string(), "Active"], StatusType::Success)
793 .status_row(["Binary Output", &per_type.to_string(), "Active"], StatusType::Success);
794 table.print();
795 }
796 _ => {
797 #[derive(Serialize)]
798 struct BacnetServerInfo {
799 protocol: String,
800 bind_address: String,
801 device_instance: u32,
802 objects: usize,
803 bbmd_enabled: bool,
804 object_types: Vec<ObjectTypeInfo>,
805 status: String,
806 }
807 #[derive(Serialize)]
808 struct ObjectTypeInfo {
809 object_type: String,
810 count: usize,
811 status: String,
812 }
813 let info = BacnetServerInfo {
814 protocol: "BACnet/IP".into(),
815 bind_address: self.bind_addr.to_string(),
816 device_instance: self.device_instance,
817 objects: self.objects,
818 bbmd_enabled: self.bbmd_enabled,
819 object_types: vec![
820 ObjectTypeInfo { object_type: "Device".into(), count: 1, status: "Online".into() },
821 ObjectTypeInfo { object_type: "Analog Input".into(), count: per_type, status: "Active".into() },
822 ObjectTypeInfo { object_type: "Analog Output".into(), count: per_type, status: "Active".into() },
823 ObjectTypeInfo { object_type: "Binary Input".into(), count: per_type, status: "Active".into() },
824 ObjectTypeInfo { object_type: "Binary Output".into(), count: per_type, status: "Active".into() },
825 ],
826 status: "Online".into(),
827 };
828 let _ = ctx.output().write(&info);
829 }
830 }
831 }
832
833 if !is_quiet {
834 ctx.output().info("Press Ctrl+C to stop");
835 }
836 ctx.shutdown_signal().notified().await;
837
838 self.stop_server(ctx).await?;
839 if !is_quiet {
840 ctx.output().success("BACnet simulator stopped");
841 }
842
843 Ok(CommandOutput::quiet_success())
844 }
845}
846
847#[async_trait]
848impl ProtocolCommand for BacnetCommand {
849 fn protocol(&self) -> Protocol {
850 Protocol::BacnetIp
851 }
852
853 fn default_port(&self) -> u16 {
854 47808
855 }
856
857 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
858 let output = ctx.output();
859 let spinner = output.spinner("Starting BACnet server...");
860
861 let config = BacnetServerConfig::new(self.device_instance)
862 .with_bind_addr(self.bind_addr)
863 .with_device_name("Mabinogion BACnet Simulator");
864
865 let registry = ObjectRegistry::new();
867
868 let descriptors = default_object_descriptors();
869 let objects_per_type = self.objects / descriptors.len();
870 registry.populate_standard_objects(&descriptors, objects_per_type);
871
872 let server = Arc::new(BACnetServer::new(config, registry));
873
874 {
875 let mut server_guard = self.server.lock().await;
876 *server_guard = Some(server.clone());
877 }
878
879 let server_clone = server.clone();
880 let task = tokio::spawn(async move {
881 if let Err(e) = server_clone.run().await {
882 tracing::error!("BACnet server error: {}", e);
883 }
884 });
885
886 {
887 let mut task_guard = self.server_task.lock().await;
888 *task_guard = Some(task);
889 }
890
891 tokio::time::sleep(Duration::from_millis(100)).await;
892
893 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
894 Ok(())
895 }
896
897 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
898 if let Some(server) = self.server.lock().await.as_ref() {
899 server.shutdown();
900 }
901
902 if let Some(task) = self.server_task.lock().await.take() {
903 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
904 }
905
906 Ok(())
907 }
908}
909
910pub struct KnxCommand {
916 bind_addr: SocketAddr,
917 individual_address: String,
918 group_objects: usize,
919 tags: Tags,
921 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
923 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
925}
926
927impl KnxCommand {
928 pub fn new() -> Self {
929 Self {
930 bind_addr: "0.0.0.0:3671".parse().unwrap(),
931 individual_address: "1.1.1".into(),
932 group_objects: 100,
933 tags: Tags::new(),
934 server: Arc::new(Mutex::new(None)),
935 server_task: Arc::new(Mutex::new(None)),
936 }
937 }
938
939 pub fn with_port(mut self, port: u16) -> Self {
940 self.bind_addr.set_port(port);
941 self
942 }
943
944 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
945 self.individual_address = addr.into();
946 self
947 }
948
949 pub fn with_group_objects(mut self, count: usize) -> Self {
950 self.group_objects = count;
951 self
952 }
953
954 pub fn with_tags(mut self, tags: Tags) -> Self {
955 self.tags = tags;
956 self
957 }
958}
959
960impl Default for KnxCommand {
961 fn default() -> Self {
962 Self::new()
963 }
964}
965
966#[async_trait]
967impl Command for KnxCommand {
968 fn name(&self) -> &str {
969 "knx"
970 }
971
972 fn description(&self) -> &str {
973 "Start a KNXnet/IP simulator"
974 }
975
976 fn requires_engine(&self) -> bool {
977 true
978 }
979
980 fn supports_shutdown(&self) -> bool {
981 true
982 }
983
984 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
985 let format = ctx.output().format();
986 let is_quiet = ctx.is_quiet();
987 let is_verbose = ctx.is_verbose();
988 let is_debug = ctx.is_debug();
989
990 if !is_quiet {
991 if matches!(format, OutputFormat::Table) {
992 let output = ctx.output();
993 output.header("KNXnet/IP Simulator");
994 output.kv("Bind Address", self.bind_addr);
995 output.kv("Individual Address", &self.individual_address);
996 output.kv("Group Objects", self.group_objects);
997 }
998 }
999
1000 if is_verbose {
1002 ctx.vprintln(format!(" Max Connections: 10"));
1003 ctx.vprintln(format!(" Services: Core, Device Management, Tunneling"));
1004 }
1005
1006 if is_debug {
1008 ctx.dprintln(format!("Bind address: {}", self.bind_addr));
1009 ctx.dprintln(format!("Individual address: {}", self.individual_address));
1010 ctx.dprintln(format!("Group objects: {}", self.group_objects));
1011 }
1012
1013 self.start_server(ctx).await?;
1014
1015 if !is_quiet {
1016 match format {
1017 OutputFormat::Table => {
1018 let colors_enabled = ctx.colors_enabled();
1019 let table = TableBuilder::new(colors_enabled)
1020 .header(["Service", "Status"])
1021 .status_row(["Core", "Ready"], StatusType::Success)
1022 .status_row(["Device Management", "Ready"], StatusType::Success)
1023 .status_row(["Tunneling", "Ready"], StatusType::Success);
1024 table.print();
1025 }
1026 _ => {
1027 #[derive(Serialize)]
1028 struct KnxServerInfo {
1029 protocol: String,
1030 bind_address: String,
1031 individual_address: String,
1032 group_objects: usize,
1033 services: Vec<ServiceInfo>,
1034 status: String,
1035 }
1036 #[derive(Serialize)]
1037 struct ServiceInfo {
1038 service: String,
1039 status: String,
1040 }
1041 let info = KnxServerInfo {
1042 protocol: "KNXnet/IP".into(),
1043 bind_address: self.bind_addr.to_string(),
1044 individual_address: self.individual_address.clone(),
1045 group_objects: self.group_objects,
1046 services: vec![
1047 ServiceInfo { service: "Core".into(), status: "Ready".into() },
1048 ServiceInfo { service: "Device Management".into(), status: "Ready".into() },
1049 ServiceInfo { service: "Tunneling".into(), status: "Ready".into() },
1050 ],
1051 status: "Online".into(),
1052 };
1053 let _ = ctx.output().write(&info);
1054 }
1055 }
1056 }
1057
1058 if !is_quiet {
1059 ctx.output().info("Press Ctrl+C to stop");
1060 }
1061 ctx.shutdown_signal().notified().await;
1062
1063 self.stop_server(ctx).await?;
1064 if !is_quiet {
1065 ctx.output().success("KNX simulator stopped");
1066 }
1067
1068 Ok(CommandOutput::quiet_success())
1069 }
1070}
1071
1072#[async_trait]
1073impl ProtocolCommand for KnxCommand {
1074 fn protocol(&self) -> Protocol {
1075 Protocol::KnxIp
1076 }
1077
1078 fn default_port(&self) -> u16 {
1079 3671
1080 }
1081
1082 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
1083 let output = ctx.output();
1084 let spinner = output.spinner("Starting KNX server...");
1085
1086 let individual_address: IndividualAddress = self.individual_address.parse()
1088 .map_err(|_| crate::error::CliError::ExecutionFailed {
1089 message: format!("Invalid individual address: {}", self.individual_address)
1090 })?;
1091
1092 let config = KnxServerConfig {
1093 bind_addr: self.bind_addr,
1094 individual_address,
1095 max_connections: 256,
1096 ..Default::default()
1097 };
1098
1099 let group_table = Arc::new(GroupObjectTable::new());
1101 let dpt_types = [
1102 DptId::new(1, 1), DptId::new(5, 1), DptId::new(9, 1), DptId::new(9, 4), DptId::new(9, 7), DptId::new(12, 1), DptId::new(13, 1), DptId::new(14, 56), ];
1111 let dpt_names = [
1112 "Switch", "Scaling", "Temperature", "Lux",
1113 "Humidity", "Counter", "SignedCounter", "Float",
1114 ];
1115
1116 for i in 0..self.group_objects {
1117 let main = ((i / 256) + 1) as u8;
1118 let middle = ((i / 8) % 8) as u8;
1119 let sub = (i % 256) as u8;
1120 let addr = GroupAddress::three_level(main, middle, sub);
1121 let dpt_idx = i % dpt_types.len();
1122 let name = format!("{}_{}", dpt_names[dpt_idx], i);
1123 if let Err(e) = group_table.create(addr, &name, &dpt_types[dpt_idx]) {
1124 tracing::warn!("Failed to create group object {}: {}", i, e);
1125 }
1126 }
1127
1128 let server = Arc::new(KnxServer::new(config).with_group_objects(group_table));
1129
1130 {
1131 let mut server_guard = self.server.lock().await;
1132 *server_guard = Some(server.clone());
1133 }
1134
1135 let server_clone = server.clone();
1136 let task = tokio::spawn(async move {
1137 if let Err(e) = server_clone.start().await {
1138 tracing::error!("KNX server error: {}", e);
1139 }
1140 });
1141
1142 {
1143 let mut task_guard = self.server_task.lock().await;
1144 *task_guard = Some(task);
1145 }
1146
1147 tokio::time::sleep(Duration::from_millis(100)).await;
1148
1149 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
1150 Ok(())
1151 }
1152
1153 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
1154 let server_opt = self.server.lock().await.take();
1156 if let Some(server) = server_opt {
1157 let _ = tokio::task::spawn_blocking(move || {
1159 let rt = tokio::runtime::Handle::current();
1160 rt.block_on(async {
1161 let _ = server.stop().await;
1162 })
1163 }).await;
1164 }
1165
1166 if let Some(task) = self.server_task.lock().await.take() {
1167 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
1168 }
1169
1170 Ok(())
1171 }
1172}