1use crate::context::CliContext;
6use crate::error::CliResult;
7use crate::output::{StatusType, TableBuilder};
8use crate::runner::{Command, CommandOutput};
9use async_trait::async_trait;
10use mabi_core::prelude::*;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::Mutex;
15use tokio::task::JoinHandle;
16
17use mabi_modbus::{ModbusTcpServerV2, tcp::ServerConfigV2, ModbusDevice, ModbusDeviceConfig};
19use mabi_opcua::{OpcUaServer, OpcUaServerConfig};
20use mabi_bacnet::prelude::{BACnetServer, ServerConfig as BacnetServerConfig, ObjectRegistry, AnalogInput, AnalogOutput, BinaryInput, BinaryOutput};
21use mabi_knx::{KnxServer, KnxServerConfig, IndividualAddress};
22
23#[async_trait]
25pub trait ProtocolCommand: Command {
26 fn protocol(&self) -> Protocol;
28
29 fn default_port(&self) -> u16;
31
32 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()>;
34
35 async fn stop_server(&self, ctx: &mut CliContext) -> CliResult<()>;
37}
38
39pub struct ModbusCommand {
45 bind_addr: SocketAddr,
47 devices: usize,
49 points_per_device: usize,
51 rtu_mode: bool,
53 serial_port: Option<String>,
55 server: Arc<Mutex<Option<Arc<ModbusTcpServerV2>>>>,
57 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
59}
60
61impl ModbusCommand {
62 pub fn new() -> Self {
63 Self {
64 bind_addr: "0.0.0.0:502".parse().unwrap(),
65 devices: 1,
66 points_per_device: 100,
67 rtu_mode: false,
68 serial_port: None,
69 server: Arc::new(Mutex::new(None)),
70 server_task: Arc::new(Mutex::new(None)),
71 }
72 }
73
74 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
75 self.bind_addr = addr;
76 self
77 }
78
79 pub fn with_port(mut self, port: u16) -> Self {
80 self.bind_addr.set_port(port);
81 self
82 }
83
84 pub fn with_devices(mut self, devices: usize) -> Self {
85 self.devices = devices;
86 self
87 }
88
89 pub fn with_points(mut self, points: usize) -> Self {
90 self.points_per_device = points;
91 self
92 }
93
94 pub fn with_rtu_mode(mut self, serial_port: impl Into<String>) -> Self {
95 self.rtu_mode = true;
96 self.serial_port = Some(serial_port.into());
97 self
98 }
99}
100
101impl Default for ModbusCommand {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107#[async_trait]
108impl Command for ModbusCommand {
109 fn name(&self) -> &str {
110 "modbus"
111 }
112
113 fn description(&self) -> &str {
114 "Start a Modbus TCP/RTU simulator"
115 }
116
117 fn requires_engine(&self) -> bool {
118 true
119 }
120
121 fn supports_shutdown(&self) -> bool {
122 true
123 }
124
125 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
126 {
127 let output = ctx.output();
128 if self.rtu_mode {
129 output.header("Modbus RTU Simulator");
130 output.kv(
131 "Serial Port",
132 self.serial_port.as_deref().unwrap_or("N/A"),
133 );
134 } else {
135 output.header("Modbus TCP Simulator");
136 output.kv("Bind Address", self.bind_addr);
137 }
138 output.kv("Devices", self.devices);
139 output.kv("Points per Device", self.points_per_device);
140 output.kv("Total Points", self.devices * self.points_per_device);
141 }
142
143 self.start_server(ctx).await?;
144
145 let colors_enabled = ctx.colors_enabled();
146 let table = TableBuilder::new(colors_enabled)
147 .header(["Unit ID", "Holding Regs", "Input Regs", "Coils", "Discrete", "Status"])
148 .status_row(
149 [
150 "1",
151 &(self.points_per_device / 4).to_string(),
152 &(self.points_per_device / 4).to_string(),
153 &(self.points_per_device / 4).to_string(),
154 &(self.points_per_device / 4).to_string(),
155 "Online",
156 ],
157 StatusType::Success,
158 );
159 table.print();
160
161 ctx.output().info("Press Ctrl+C to stop");
162 ctx.shutdown_signal().notified().await;
163
164 self.stop_server(ctx).await?;
165 ctx.output().success("Modbus simulator stopped");
166
167 Ok(CommandOutput::quiet_success())
168 }
169}
170
171#[async_trait]
172impl ProtocolCommand for ModbusCommand {
173 fn protocol(&self) -> Protocol {
174 if self.rtu_mode {
175 Protocol::ModbusRtu
176 } else {
177 Protocol::ModbusTcp
178 }
179 }
180
181 fn default_port(&self) -> u16 {
182 502
183 }
184
185 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
186 let output = ctx.output();
187 let spinner = output.spinner("Starting Modbus server...");
188
189 let config = ServerConfigV2 {
190 bind_address: self.bind_addr,
191 ..Default::default()
192 };
193
194 let server = Arc::new(ModbusTcpServerV2::new(config));
195
196 for i in 0..self.devices {
197 let unit_id = (i + 1) as u8;
198 let points = (self.points_per_device / 4) as u16;
199 let device_config = ModbusDeviceConfig {
200 unit_id,
201 name: format!("Device-{}", unit_id),
202 holding_registers: points,
203 input_registers: points,
204 coils: points,
205 discrete_inputs: points,
206 response_delay_ms: 0,
207 };
208 let device = ModbusDevice::new(device_config);
209 server.add_device(device);
210 }
211
212 {
213 let mut server_guard = self.server.lock().await;
214 *server_guard = Some(server.clone());
215 }
216
217 let server_clone = server.clone();
218 let task = tokio::spawn(async move {
219 if let Err(e) = server_clone.run().await {
220 tracing::error!("Modbus server error: {}", e);
221 }
222 });
223
224 {
225 let mut task_guard = self.server_task.lock().await;
226 *task_guard = Some(task);
227 }
228
229 tokio::time::sleep(Duration::from_millis(100)).await;
230
231 spinner.finish_with_message(format!("Modbus server started on {}", self.bind_addr));
232 Ok(())
233 }
234
235 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
236 if let Some(server) = self.server.lock().await.as_ref() {
237 server.shutdown();
238 }
239
240 if let Some(task) = self.server_task.lock().await.take() {
241 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
242 }
243
244 Ok(())
245 }
246}
247
248pub struct OpcuaCommand {
254 bind_addr: SocketAddr,
255 endpoint_path: String,
256 nodes: usize,
257 security_mode: String,
258 server: Arc<Mutex<Option<Arc<OpcUaServer>>>>,
260 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
262}
263
264impl OpcuaCommand {
265 pub fn new() -> Self {
266 Self {
267 bind_addr: "0.0.0.0:4840".parse().unwrap(),
268 endpoint_path: "/".into(),
269 nodes: 1000,
270 security_mode: "None".into(),
271 server: Arc::new(Mutex::new(None)),
272 server_task: Arc::new(Mutex::new(None)),
273 }
274 }
275
276 pub fn with_port(mut self, port: u16) -> Self {
277 self.bind_addr.set_port(port);
278 self
279 }
280
281 pub fn with_endpoint(mut self, path: impl Into<String>) -> Self {
282 self.endpoint_path = path.into();
283 self
284 }
285
286 pub fn with_nodes(mut self, nodes: usize) -> Self {
287 self.nodes = nodes;
288 self
289 }
290
291 pub fn with_security(mut self, mode: impl Into<String>) -> Self {
292 self.security_mode = mode.into();
293 self
294 }
295}
296
297impl Default for OpcuaCommand {
298 fn default() -> Self {
299 Self::new()
300 }
301}
302
303#[async_trait]
304impl Command for OpcuaCommand {
305 fn name(&self) -> &str {
306 "opcua"
307 }
308
309 fn description(&self) -> &str {
310 "Start an OPC UA server simulator"
311 }
312
313 fn requires_engine(&self) -> bool {
314 true
315 }
316
317 fn supports_shutdown(&self) -> bool {
318 true
319 }
320
321 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
322 {
323 let output = ctx.output();
324 output.header("OPC UA Simulator");
325 output.kv("Endpoint", format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path));
326 output.kv("Nodes", self.nodes);
327 output.kv("Security Mode", &self.security_mode);
328 }
329
330 self.start_server(ctx).await?;
331
332 let colors_enabled = ctx.colors_enabled();
333 let table = TableBuilder::new(colors_enabled)
334 .header(["Namespace", "Nodes", "Subscriptions", "Status"])
335 .status_row(["0", "Standard", "0", "Ready"], StatusType::Info)
336 .status_row(
337 ["1", &self.nodes.to_string(), "0", "Online"],
338 StatusType::Success,
339 );
340 table.print();
341
342 ctx.output().info("Press Ctrl+C to stop");
343 ctx.shutdown_signal().notified().await;
344
345 self.stop_server(ctx).await?;
346 ctx.output().success("OPC UA simulator stopped");
347
348 Ok(CommandOutput::quiet_success())
349 }
350}
351
352#[async_trait]
353impl ProtocolCommand for OpcuaCommand {
354 fn protocol(&self) -> Protocol {
355 Protocol::OpcUa
356 }
357
358 fn default_port(&self) -> u16 {
359 4840
360 }
361
362 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
363 let output = ctx.output();
364 let spinner = output.spinner("Starting OPC UA server...");
365
366 let config = OpcUaServerConfig {
367 endpoint_url: format!("opc.tcp://{}{}", self.bind_addr, self.endpoint_path),
368 server_name: "Mabinogion OPC UA Simulator".to_string(),
369 max_subscriptions: 1000,
370 max_monitored_items: 10000,
371 ..Default::default()
372 };
373
374 let server = Arc::new(OpcUaServer::new(config).map_err(|e| {
375 crate::error::CliError::ExecutionFailed {
376 message: format!("Failed to create OPC UA server: {}", e)
377 }
378 })?);
379
380 for i in 0..self.nodes.min(100) {
382 let node_id = format!("ns=2;i={}", 1000 + i);
383 let _ = server.add_variable(node_id, format!("Variable_{}", i), (i as f64) * 0.1);
384 }
385
386 {
387 let mut server_guard = self.server.lock().await;
388 *server_guard = Some(server.clone());
389 }
390
391 let server_clone = server.clone();
392 let task = tokio::spawn(async move {
393 if let Err(e) = server_clone.start().await {
394 tracing::error!("OPC UA server error: {}", e);
395 }
396 });
397
398 {
399 let mut task_guard = self.server_task.lock().await;
400 *task_guard = Some(task);
401 }
402
403 tokio::time::sleep(Duration::from_millis(100)).await;
404
405 spinner.finish_with_message(format!("OPC UA server started on {}", self.bind_addr));
406 Ok(())
407 }
408
409 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
410 if let Some(server) = self.server.lock().await.as_ref() {
411 let _ = server.stop().await;
412 }
413
414 if let Some(task) = self.server_task.lock().await.take() {
415 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
416 }
417
418 Ok(())
419 }
420}
421
422pub struct BacnetCommand {
428 bind_addr: SocketAddr,
429 device_instance: u32,
430 objects: usize,
431 bbmd_enabled: bool,
432 server: Arc<Mutex<Option<Arc<BACnetServer>>>>,
434 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
436}
437
438impl BacnetCommand {
439 pub fn new() -> Self {
440 Self {
441 bind_addr: "0.0.0.0:47808".parse().unwrap(),
442 device_instance: 1234,
443 objects: 100,
444 bbmd_enabled: false,
445 server: Arc::new(Mutex::new(None)),
446 server_task: Arc::new(Mutex::new(None)),
447 }
448 }
449
450 pub fn with_port(mut self, port: u16) -> Self {
451 self.bind_addr.set_port(port);
452 self
453 }
454
455 pub fn with_device_instance(mut self, instance: u32) -> Self {
456 self.device_instance = instance;
457 self
458 }
459
460 pub fn with_objects(mut self, objects: usize) -> Self {
461 self.objects = objects;
462 self
463 }
464
465 pub fn with_bbmd(mut self, enabled: bool) -> Self {
466 self.bbmd_enabled = enabled;
467 self
468 }
469}
470
471impl Default for BacnetCommand {
472 fn default() -> Self {
473 Self::new()
474 }
475}
476
477#[async_trait]
478impl Command for BacnetCommand {
479 fn name(&self) -> &str {
480 "bacnet"
481 }
482
483 fn description(&self) -> &str {
484 "Start a BACnet/IP simulator"
485 }
486
487 fn requires_engine(&self) -> bool {
488 true
489 }
490
491 fn supports_shutdown(&self) -> bool {
492 true
493 }
494
495 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
496 {
497 let output = ctx.output();
498 output.header("BACnet/IP Simulator");
499 output.kv("Bind Address", self.bind_addr);
500 output.kv("Device Instance", self.device_instance);
501 output.kv("Objects", self.objects);
502 output.kv("BBMD", if self.bbmd_enabled { "Enabled" } else { "Disabled" });
503 }
504
505 self.start_server(ctx).await?;
506
507 let colors_enabled = ctx.colors_enabled();
508 let table = TableBuilder::new(colors_enabled)
509 .header(["Object Type", "Count", "Status"])
510 .status_row(["Device", "1", "Online"], StatusType::Success)
511 .status_row(["Analog Input", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
512 .status_row(["Analog Output", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
513 .status_row(["Binary Input", &(self.objects / 4).to_string(), "Active"], StatusType::Success)
514 .status_row(["Binary Output", &(self.objects / 4).to_string(), "Active"], StatusType::Success);
515 table.print();
516
517 ctx.output().info("Press Ctrl+C to stop");
518 ctx.shutdown_signal().notified().await;
519
520 self.stop_server(ctx).await?;
521 ctx.output().success("BACnet simulator stopped");
522
523 Ok(CommandOutput::quiet_success())
524 }
525}
526
527#[async_trait]
528impl ProtocolCommand for BacnetCommand {
529 fn protocol(&self) -> Protocol {
530 Protocol::BacnetIp
531 }
532
533 fn default_port(&self) -> u16 {
534 47808
535 }
536
537 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
538 let output = ctx.output();
539 let spinner = output.spinner("Starting BACnet server...");
540
541 let config = BacnetServerConfig::new(self.device_instance)
542 .with_bind_addr(self.bind_addr)
543 .with_device_name("Mabinogion BACnet Simulator");
544
545 let registry = ObjectRegistry::new();
547
548 let objects_per_type = self.objects / 4;
549 for i in 0..objects_per_type {
550 let ai = AnalogInput::new((i + 1) as u32, format!("AI_{}", i + 1));
551 registry.register(Arc::new(ai));
552 }
553 for i in 0..objects_per_type {
554 let ao = AnalogOutput::new((i + 1) as u32, format!("AO_{}", i + 1));
555 registry.register(Arc::new(ao));
556 }
557 for i in 0..objects_per_type {
558 let bi = BinaryInput::new((i + 1) as u32, format!("BI_{}", i + 1));
559 registry.register(Arc::new(bi));
560 }
561 for i in 0..objects_per_type {
562 let bo = BinaryOutput::new((i + 1) as u32, format!("BO_{}", i + 1));
563 registry.register(Arc::new(bo));
564 }
565
566 let server = Arc::new(BACnetServer::new(config, registry));
567
568 {
569 let mut server_guard = self.server.lock().await;
570 *server_guard = Some(server.clone());
571 }
572
573 let server_clone = server.clone();
574 let task = tokio::spawn(async move {
575 if let Err(e) = server_clone.run().await {
576 tracing::error!("BACnet server error: {}", e);
577 }
578 });
579
580 {
581 let mut task_guard = self.server_task.lock().await;
582 *task_guard = Some(task);
583 }
584
585 tokio::time::sleep(Duration::from_millis(100)).await;
586
587 spinner.finish_with_message(format!("BACnet server started on {}", self.bind_addr));
588 Ok(())
589 }
590
591 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
592 if let Some(server) = self.server.lock().await.as_ref() {
593 server.shutdown();
594 }
595
596 if let Some(task) = self.server_task.lock().await.take() {
597 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
598 }
599
600 Ok(())
601 }
602}
603
604pub struct KnxCommand {
610 bind_addr: SocketAddr,
611 individual_address: String,
612 group_objects: usize,
613 server: Arc<Mutex<Option<Arc<KnxServer>>>>,
615 server_task: Arc<Mutex<Option<JoinHandle<()>>>>,
617}
618
619impl KnxCommand {
620 pub fn new() -> Self {
621 Self {
622 bind_addr: "0.0.0.0:3671".parse().unwrap(),
623 individual_address: "1.1.1".into(),
624 group_objects: 100,
625 server: Arc::new(Mutex::new(None)),
626 server_task: Arc::new(Mutex::new(None)),
627 }
628 }
629
630 pub fn with_port(mut self, port: u16) -> Self {
631 self.bind_addr.set_port(port);
632 self
633 }
634
635 pub fn with_individual_address(mut self, addr: impl Into<String>) -> Self {
636 self.individual_address = addr.into();
637 self
638 }
639
640 pub fn with_group_objects(mut self, count: usize) -> Self {
641 self.group_objects = count;
642 self
643 }
644}
645
646impl Default for KnxCommand {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652#[async_trait]
653impl Command for KnxCommand {
654 fn name(&self) -> &str {
655 "knx"
656 }
657
658 fn description(&self) -> &str {
659 "Start a KNXnet/IP simulator"
660 }
661
662 fn requires_engine(&self) -> bool {
663 true
664 }
665
666 fn supports_shutdown(&self) -> bool {
667 true
668 }
669
670 async fn execute(&self, ctx: &mut CliContext) -> CliResult<CommandOutput> {
671 {
672 let output = ctx.output();
673 output.header("KNXnet/IP Simulator");
674 output.kv("Bind Address", self.bind_addr);
675 output.kv("Individual Address", &self.individual_address);
676 output.kv("Group Objects", self.group_objects);
677 }
678
679 self.start_server(ctx).await?;
680
681 let colors_enabled = ctx.colors_enabled();
682 let table = TableBuilder::new(colors_enabled)
683 .header(["Service", "Status"])
684 .status_row(["Core", "Ready"], StatusType::Success)
685 .status_row(["Device Management", "Ready"], StatusType::Success)
686 .status_row(["Tunneling", "Ready"], StatusType::Success);
687 table.print();
688
689 ctx.output().info("Press Ctrl+C to stop");
690 ctx.shutdown_signal().notified().await;
691
692 self.stop_server(ctx).await?;
693 ctx.output().success("KNX simulator stopped");
694
695 Ok(CommandOutput::quiet_success())
696 }
697}
698
699#[async_trait]
700impl ProtocolCommand for KnxCommand {
701 fn protocol(&self) -> Protocol {
702 Protocol::KnxIp
703 }
704
705 fn default_port(&self) -> u16 {
706 3671
707 }
708
709 async fn start_server(&self, ctx: &mut CliContext) -> CliResult<()> {
710 let output = ctx.output();
711 let spinner = output.spinner("Starting KNX server...");
712
713 let individual_address: IndividualAddress = self.individual_address.parse()
715 .map_err(|_| crate::error::CliError::ExecutionFailed {
716 message: format!("Invalid individual address: {}", self.individual_address)
717 })?;
718
719 let config = KnxServerConfig {
720 bind_addr: self.bind_addr,
721 individual_address,
722 max_connections: 10,
723 ..Default::default()
724 };
725
726 let server = Arc::new(KnxServer::new(config));
727
728 {
729 let mut server_guard = self.server.lock().await;
730 *server_guard = Some(server.clone());
731 }
732
733 let server_clone = server.clone();
734 let task = tokio::spawn(async move {
735 if let Err(e) = server_clone.start().await {
736 tracing::error!("KNX server error: {}", e);
737 }
738 });
739
740 {
741 let mut task_guard = self.server_task.lock().await;
742 *task_guard = Some(task);
743 }
744
745 tokio::time::sleep(Duration::from_millis(100)).await;
746
747 spinner.finish_with_message(format!("KNX server started on {}", self.bind_addr));
748 Ok(())
749 }
750
751 async fn stop_server(&self, _ctx: &mut CliContext) -> CliResult<()> {
752 let server_opt = self.server.lock().await.take();
754 if let Some(server) = server_opt {
755 let _ = tokio::task::spawn_blocking(move || {
757 let rt = tokio::runtime::Handle::current();
758 rt.block_on(async {
759 let _ = server.stop().await;
760 })
761 }).await;
762 }
763
764 if let Some(task) = self.server_task.lock().await.take() {
765 let _ = tokio::time::timeout(Duration::from_secs(5), task).await;
766 }
767
768 Ok(())
769 }
770}