autocore_std/lib.rs
1//! # AutoCore Standard Library
2//!
3//! The standard library for writing AutoCore control programs. This crate provides
4//! everything you need to build real-time control applications that integrate with
5//! the AutoCore server ecosystem.
6//!
7//! ## Overview
8//!
9//! AutoCore control programs run as separate processes that communicate with the
10//! autocore-server via shared memory and IPC. This library handles all the low-level
11//! details, allowing you to focus on your control logic.
12//!
13//! ```text
14//! ┌─────────────────────────┐ ┌─────────────────────────┐
15//! │ autocore-server │ │ Your Control Program │
16//! │ │ │ │
17//! │ ┌─────────────────┐ │ │ ┌─────────────────┐ │
18//! │ │ Shared Memory │◄───┼─────┼──│ ControlRunner │ │
19//! │ │ (GlobalMemory) │ │ │ │ │ │
20//! │ └─────────────────┘ │ │ │ ┌─────────────┐ │ │
21//! │ │ │ │ │ Your Logic │ │ │
22//! │ ┌─────────────────┐ │ │ │ └─────────────┘ │ │
23//! │ │ Tick Signal │────┼─────┼──│ │ │
24//! │ └─────────────────┘ │ │ └─────────────────┘ │
25//! └─────────────────────────┘ └─────────────────────────┘
26//! ```
27//!
28//! ## Quick Start
29//!
30//! 1. Create a new control project using `acctl`:
31//! ```bash
32//! acctl clone <server-ip> <project-name>
33//! ```
34//!
35//! 2. Implement the [`ControlProgram`] trait:
36//! ```ignore
37//! use autocore_std::{ControlProgram, TickContext};
38//! use autocore_std::fb::RTrig;
39//!
40//! // GlobalMemory is generated from your project.json
41//! mod gm;
42//! use gm::GlobalMemory;
43//!
44//! pub struct MyProgram {
45//! start_button: RTrig,
46//! }
47//!
48//! impl MyProgram {
49//! pub fn new() -> Self {
50//! Self {
51//! start_button: RTrig::new(),
52//! }
53//! }
54//! }
55//!
56//! impl ControlProgram for MyProgram {
57//! type Memory = GlobalMemory;
58//!
59//! fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
60//! // Detect rising edge on start button
61//! if self.start_button.call(ctx.gm.inputs.start_button) {
62//! ctx.gm.outputs.motor_running = true;
63//! autocore_std::log::info!("Motor started!");
64//! }
65//! }
66//! }
67//! ```
68//!
69//! 3. Use the [`autocore_main!`] macro for the entry point:
70//! ```ignore
71//! autocore_std::autocore_main!(MyProgram, "my_project_shm", "tick");
72//! ```
73//!
74//! ## Function Blocks (IEC 61131-3 Inspired)
75//!
76//! This library includes standard function blocks commonly used in PLC programming:
77//!
78//! - [`fb::RTrig`] - Rising edge detector (false→true transition)
79//! - [`fb::FTrig`] - Falling edge detector (true→false transition)
80//! - [`fb::Ton`] - Timer On Delay (output after delay)
81//! - [`fb::BitResetOnDelay`] - Resets a boolean after it has been true for a duration
82//! - [`fb::SimpleTimer`] - Simple one-shot timer (NOT IEC 61131-3, for imperative use)
83//! - [`fb::StateMachine`] - State machine helper with automatic timer management
84//! - [`fb::RunningAverage`] - Accumulates values and computes their arithmetic mean
85//! - [`fb::Beeper`] - Audible beeper controller with configurable beep sequences
86//! - [`fb::Heartbeat`] - Monitors a remote heartbeat counter for connection loss
87//!
88//! ### Example: Edge Detection
89//!
90//! ```
91//! use autocore_std::fb::RTrig;
92//!
93//! let mut trigger = RTrig::new();
94//!
95//! // First call with false - no edge
96//! assert_eq!(trigger.call(false), false);
97//!
98//! // Rising edge detected!
99//! assert_eq!(trigger.call(true), true);
100//!
101//! // Still true, but no edge (already high)
102//! assert_eq!(trigger.call(true), false);
103//!
104//! // Back to false
105//! assert_eq!(trigger.call(false), false);
106//!
107//! // Another rising edge
108//! assert_eq!(trigger.call(true), true);
109//! ```
110//!
111//! ### Example: Timer
112//!
113//! ```
114//! use autocore_std::fb::Ton;
115//! use std::time::Duration;
116//!
117//! let mut timer = Ton::new();
118//! let delay = Duration::from_millis(100);
119//!
120//! // Timer not enabled - output is false
121//! assert_eq!(timer.call(false, delay), false);
122//!
123//! // Enable timer - starts counting
124//! assert_eq!(timer.call(true, delay), false);
125//!
126//! // Still counting...
127//! std::thread::sleep(Duration::from_millis(50));
128//! assert_eq!(timer.call(true, delay), false);
129//! assert!(timer.et < delay); // Elapsed time < preset
130//!
131//! // After delay elapsed
132//! std::thread::sleep(Duration::from_millis(60));
133//! assert_eq!(timer.call(true, delay), true); // Output is now true!
134//! ```
135//!
136//! ## Logging
137//!
138//! Control programs can send log messages to the autocore-server for display in the
139//! web console. Logging is handled automatically when using [`ControlRunner`].
140//!
141//! ```ignore
142//! use autocore_std::log;
143//!
144//! log::trace!("Detailed trace message");
145//! log::debug!("Debug information");
146//! log::info!("Normal operation message");
147//! log::warn!("Warning condition detected");
148//! log::error!("Error occurred!");
149//! ```
150//!
151//! See the [`logger`] module for advanced configuration.
152//!
153//! ## Memory Synchronization
154//!
155//! The [`ControlRunner`] handles all shared memory synchronization automatically:
156//!
157//! 1. **Wait for tick** - Blocks until the server signals a new cycle
158//! 2. **Read inputs** - Copies shared memory to local buffer (atomic snapshot)
159//! 3. **Execute logic** - Your `process_tick` runs on the local buffer
160//! 4. **Write outputs** - Copies local buffer back to shared memory
161//!
162//! This ensures your control logic always sees a consistent view of the data,
163//! even when other processes are modifying shared memory.
164
165#![warn(missing_docs)]
166#![warn(rustdoc::missing_crate_level_docs)]
167#![doc(html_root_url = "https://docs.rs/autocore-std/3.3.0")]
168
169use anyhow::{anyhow, Result};
170use futures_util::{SinkExt, StreamExt};
171use log::LevelFilter;
172use mechutil::ipc::{CommandMessage, MessageType};
173use raw_sync::events::{Event, EventInit, EventState};
174use raw_sync::Timeout;
175use shared_memory::ShmemConf;
176use std::collections::HashMap;
177use std::sync::atomic::{fence, Ordering, AtomicBool};
178use std::sync::Arc;
179use std::time::Duration;
180use tokio_tungstenite::{connect_async, tungstenite::Message};
181
182/// UDP logger for sending log messages to autocore-server.
183///
184/// This module provides a non-blocking logger implementation that sends log messages
185/// via UDP to the autocore-server. Messages are batched and sent asynchronously to
186/// avoid impacting the control loop timing.
187///
188/// # Example
189///
190/// ```ignore
191/// use autocore_std::logger;
192/// use log::LevelFilter;
193///
194/// // Initialize the logger (done automatically by ControlRunner)
195/// logger::init_udp_logger("127.0.0.1", 39101, LevelFilter::Info, "control")?;
196///
197/// // Now you can use the log macros
198/// log::info!("System initialized");
199/// ```
200pub mod logger;
201
202// Re-export log crate for convenience - control programs can use autocore_std::log::info!() etc.
203pub use log;
204
205/// Function blocks for control programs (IEC 61131-3 inspired).
206pub mod fb;
207
208/// Interface protocols for communication between control programs and external sources.
209pub mod iface;
210
211/// Client for sending IPC commands to external modules via WebSocket.
212pub mod command_client;
213pub use command_client::CommandClient;
214
215/// EtherCAT utilities (SDO client, etc.).
216pub mod ethercat;
217
218/// CiA 402 motion control: axis abstraction, traits, and types.
219pub mod motion;
220
221/// Shared memory utilities for external modules.
222pub mod shm;
223
224/// Lightweight process diagnostics (FD count, RSS).
225pub mod diagnostics;
226
227/// Banner Engineering device helpers (WLS15 IO-Link light strip, etc.).
228pub mod banner;
229
230/// Fixed-length string type for shared memory variables.
231pub mod fixed_string;
232pub use fixed_string::FixedString;
233
234// ============================================================================
235// Core Framework
236// ============================================================================
237
238/// Marker trait for generated GlobalMemory structs.
239///
240/// This trait is implemented by the auto-generated `GlobalMemory` struct
241/// that represents the shared memory layout. It serves as a marker for
242/// type safety in the control framework.
243///
244/// You don't need to implement this trait yourself - it's automatically
245/// implemented by the code generator.
246pub trait AutoCoreMemory {}
247
248/// Trait for detecting changes in memory structures.
249pub trait ChangeTracker {
250 /// Compare self with a previous state and return a list of changed fields.
251 /// Returns a vector of (field_name, new_value).
252 fn get_changes(&self, prev: &Self) -> Vec<(&'static str, serde_json::Value)>;
253
254 /// Unpack bit-mapped variables from their source words.
255 /// Called automatically after reading shared memory, before `process_tick`.
256 /// Auto-generated by codegen when bit-mapped variables exist; default is no-op.
257 fn unpack_bits(&mut self) {}
258
259 /// Pack bit-mapped variables back into their source words.
260 /// Called automatically after `process_tick`, before writing shared memory.
261 /// Auto-generated by codegen when bit-mapped variables exist; default is no-op.
262 fn pack_bits(&mut self) {}
263}
264
265/// Per-tick context passed to the control program by the framework.
266///
267/// `TickContext` bundles all per-cycle data into a single struct so that the
268/// [`ControlProgram::process_tick`] signature stays stable as new fields are
269/// added in the future (e.g., delta time, diagnostics).
270///
271/// The framework constructs a fresh `TickContext` each cycle, calls
272/// [`CommandClient::poll`] before handing it to the program, and writes
273/// the memory back to shared memory after `process_tick` returns.
274pub struct TickContext<'a, M> {
275 /// Mutable reference to the local shared memory copy.
276 pub gm: &'a mut M,
277 /// IPC command client for communicating with external modules.
278 pub client: &'a mut CommandClient,
279 /// Current cycle number (starts at 1, increments each tick).
280 pub cycle: u64,
281}
282
283/// The trait that defines a control program's logic.
284///
285/// Implement this trait to create your control program. The associated `Memory`
286/// type should be the generated `GlobalMemory` struct from your project.
287///
288/// # Memory Type Requirements
289///
290/// The `Memory` type must implement `Copy` to allow efficient synchronization
291/// between shared memory and local buffers. This is automatically satisfied
292/// by the generated `GlobalMemory` struct.
293///
294/// # Lifecycle
295///
296/// 1. `initialize` is called once at startup
297/// 2. `process_tick` is called repeatedly in the control loop with a
298/// [`TickContext`] that provides shared memory, the IPC client, and the
299/// current cycle number.
300///
301/// # Example
302///
303/// ```ignore
304/// use autocore_std::{ControlProgram, TickContext};
305///
306/// mod gm;
307/// use gm::GlobalMemory;
308///
309/// pub struct MyController {
310/// cycle_counter: u64,
311/// }
312///
313/// impl MyController {
314/// pub fn new() -> Self {
315/// Self { cycle_counter: 0 }
316/// }
317/// }
318///
319/// impl ControlProgram for MyController {
320/// type Memory = GlobalMemory;
321///
322/// fn initialize(&mut self, mem: &mut GlobalMemory) {
323/// // Set initial output states
324/// mem.outputs.ready = true;
325/// log::info!("Controller initialized");
326/// }
327///
328/// fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
329/// self.cycle_counter = ctx.cycle;
330///
331/// // Your control logic here
332/// if ctx.gm.inputs.start && !ctx.gm.inputs.estop {
333/// ctx.gm.outputs.running = true;
334/// }
335/// }
336/// }
337/// ```
338pub trait ControlProgram {
339 /// The shared memory structure type (usually the generated `GlobalMemory`).
340 ///
341 /// Must implement `Copy` to allow efficient memory synchronization.
342 type Memory: Copy + ChangeTracker;
343
344 /// Called once when the control program starts.
345 ///
346 /// Use this to initialize output states, reset counters, or perform
347 /// any one-time setup. The default implementation does nothing.
348 ///
349 /// # Arguments
350 ///
351 /// * `mem` - Mutable reference to the shared memory. Changes are written
352 /// back to shared memory after this method returns.
353 fn initialize(&mut self, _mem: &mut Self::Memory) {}
354
355 /// The main control loop - called once per scan cycle.
356 ///
357 /// This is where your control logic lives. Read inputs from `ctx.gm`,
358 /// perform calculations, and write outputs back to `ctx.gm`. Use
359 /// `ctx.client` for IPC commands and `ctx.cycle` for the current cycle
360 /// number.
361 ///
362 /// The framework calls [`CommandClient::poll`] before each invocation,
363 /// so incoming responses are already buffered when your code runs.
364 ///
365 /// # Arguments
366 ///
367 /// * `ctx` - A [`TickContext`] containing the local shared memory copy,
368 /// the IPC command client, and the current cycle number.
369 ///
370 /// # Timing
371 ///
372 /// This method should complete within the scan cycle time. Long-running
373 /// operations will cause cycle overruns.
374 fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>);
375}
376
377/// Configuration for the [`ControlRunner`].
378///
379/// Specifies connection parameters, shared memory names, and logging settings.
380/// Use [`Default::default()`] for typical configurations.
381///
382/// # Example
383///
384/// ```
385/// use autocore_std::RunnerConfig;
386/// use log::LevelFilter;
387///
388/// let config = RunnerConfig {
389/// server_host: "192.168.1.100".to_string(),
390/// module_name: "my_controller".to_string(),
391/// shm_name: "my_project_shm".to_string(),
392/// tick_signal_name: "tick".to_string(),
393/// busy_signal_name: Some("busy".to_string()),
394/// log_level: LevelFilter::Debug,
395/// ..Default::default()
396/// };
397/// ```
398#[derive(Debug, Clone)]
399pub struct RunnerConfig {
400 /// Server host address (default: "127.0.0.1")
401 pub server_host: String,
402 /// WebSocket port for commands (default: 11969)
403 pub ws_port: u16,
404 /// Module name for identification (default: "control")
405 pub module_name: String,
406 /// Shared memory segment name (must match server configuration)
407 pub shm_name: String,
408 /// Name of the tick signal in shared memory (triggers each scan cycle)
409 pub tick_signal_name: String,
410 /// Optional name of the busy signal (set when cycle completes)
411 pub busy_signal_name: Option<String>,
412 /// Minimum log level to send to the server (default: Info)
413 pub log_level: LevelFilter,
414 /// UDP port for sending logs to the server (default: 39101)
415 pub log_udp_port: u16,
416}
417
418/// Default WebSocket port for autocore-server
419pub const DEFAULT_WS_PORT: u16 = 11969;
420
421impl Default for RunnerConfig {
422 fn default() -> Self {
423 Self {
424 server_host: "127.0.0.1".to_string(),
425 ws_port: DEFAULT_WS_PORT,
426 module_name: "control".to_string(),
427 shm_name: "autocore_cyclic".to_string(),
428 tick_signal_name: "tick".to_string(),
429 busy_signal_name: None,
430 log_level: LevelFilter::Info,
431 log_udp_port: logger::DEFAULT_LOG_UDP_PORT,
432 }
433 }
434}
435
436
437/// The main execution engine for control programs.
438///
439/// `ControlRunner` handles all the infrastructure required to run a control program:
440///
441/// - Reading memory layout from the server's layout file
442/// - Opening and mapping shared memory
443/// - Setting up synchronization signals
444/// - Running the real-time control loop
445/// - Sending log messages to the server
446///
447/// # Usage
448///
449/// ```ignore
450/// use autocore_std::{ControlRunner, RunnerConfig};
451///
452/// let config = RunnerConfig {
453/// shm_name: "my_project_shm".to_string(),
454/// tick_signal_name: "tick".to_string(),
455/// ..Default::default()
456/// };
457///
458/// ControlRunner::new(MyProgram::new())
459/// .config(config)
460/// .run()?; // Blocks forever
461/// ```
462///
463/// # Control Loop
464///
465/// The runner executes a synchronous control loop:
466///
467/// 1. **Wait** - Blocks until the tick signal is set by the server
468/// 2. **Read** - Copies shared memory to a local buffer (acquire barrier)
469/// 3. **Execute** - Calls your `process_tick` method
470/// 4. **Write** - Copies local buffer back to shared memory (release barrier)
471/// 5. **Signal** - Sets the busy signal (if configured) to indicate completion
472///
473/// This ensures your code always sees a consistent snapshot of the data
474/// and that your writes are atomically visible to other processes.
475pub struct ControlRunner<P: ControlProgram> {
476 config: RunnerConfig,
477 program: P,
478}
479
480impl<P: ControlProgram> ControlRunner<P> {
481 /// Creates a new runner for the given control program.
482 ///
483 /// Uses default configuration. Call [`.config()`](Self::config) to customize.
484 ///
485 /// # Arguments
486 ///
487 /// * `program` - Your control program instance
488 ///
489 /// # Example
490 ///
491 /// ```ignore
492 /// let runner = ControlRunner::new(MyProgram::new());
493 /// ```
494 pub fn new(program: P) -> Self {
495 Self {
496 config: RunnerConfig::default(),
497 program,
498 }
499 }
500
501 /// Sets the configuration for this runner.
502 ///
503 /// # Arguments
504 ///
505 /// * `config` - The configuration to use
506 ///
507 /// # Example
508 ///
509 /// ```ignore
510 /// ControlRunner::new(MyProgram::new())
511 /// .config(RunnerConfig {
512 /// shm_name: "custom_shm".to_string(),
513 /// ..Default::default()
514 /// })
515 /// .run()?;
516 /// ```
517 pub fn config(mut self, config: RunnerConfig) -> Self {
518 self.config = config;
519 self
520 }
521
522 /// Starts the control loop.
523 ///
524 /// This method blocks indefinitely, running the control loop until
525 /// an error occurs or the process is terminated.
526 ///
527 /// # Returns
528 ///
529 /// Returns `Ok(())` only if the loop exits cleanly (which typically
530 /// doesn't happen). Returns an error if:
531 ///
532 /// - IPC connection fails
533 /// - Shared memory cannot be opened
534 /// - Signal offsets cannot be found
535 /// - A critical error occurs during execution
536 ///
537 /// # Example
538 ///
539 /// ```ignore
540 /// fn main() -> anyhow::Result<()> {
541 /// ControlRunner::new(MyProgram::new())
542 /// .config(config)
543 /// .run()
544 /// }
545 /// ```
546 pub fn run(mut self) -> Result<()> {
547 // Initialize UDP logger FIRST (before any log statements)
548 if let Err(e) = logger::init_udp_logger(
549 &self.config.server_host,
550 self.config.log_udp_port,
551 self.config.log_level,
552 "control",
553 ) {
554 eprintln!("Warning: Failed to initialize UDP logger: {}", e);
555 // Continue anyway - logging will just go nowhere
556 }
557
558 // Multi-threaded runtime so spawned WS read/write tasks can run
559 // alongside the synchronous control loop.
560 let rt = tokio::runtime::Builder::new_multi_thread()
561 .worker_threads(2)
562 .enable_all()
563 .build()?;
564
565 rt.block_on(async {
566 log::info!("AutoCore Control Runner Starting...");
567
568 // 1. Connect to server via WebSocket and get layout
569 let ws_url = format!("ws://{}:{}/ws/", self.config.server_host, self.config.ws_port);
570 log::info!("Connecting to server at {}", ws_url);
571
572 let (ws_stream, _) = connect_async(&ws_url).await
573 .map_err(|e| anyhow!("Failed to connect to server at {}: {}", ws_url, e))?;
574
575 let (mut write, mut read) = ws_stream.split();
576
577 // Send gm.get_layout request
578 let request = CommandMessage::request("gm.get_layout", serde_json::Value::Null);
579 let transaction_id = request.transaction_id;
580 let request_json = serde_json::to_string(&request)?;
581
582 write.send(Message::Text(request_json)).await
583 .map_err(|e| anyhow!("Failed to send layout request: {}", e))?;
584
585 // Wait for response with matching transaction_id
586 let timeout = Duration::from_secs(10);
587 let start = std::time::Instant::now();
588 let mut layout: Option<HashMap<String, serde_json::Value>> = None;
589
590 while start.elapsed() < timeout {
591 match tokio::time::timeout(Duration::from_secs(1), read.next()).await {
592 Ok(Some(Ok(Message::Text(text)))) => {
593 if let Ok(response) = serde_json::from_str::<CommandMessage>(&text) {
594 if response.transaction_id == transaction_id {
595 if !response.success {
596 return Err(anyhow!("Server error: {}", response.error_message));
597 }
598 layout = Some(serde_json::from_value(response.data)?);
599 break;
600 }
601 // Skip broadcasts and other messages
602 if response.message_type == MessageType::Broadcast {
603 continue;
604 }
605 }
606 }
607 Ok(Some(Ok(_))) => continue,
608 Ok(Some(Err(e))) => return Err(anyhow!("WebSocket error: {}", e)),
609 Ok(None) => return Err(anyhow!("Server closed connection")),
610 Err(_) => continue, // Timeout on single read, keep trying
611 }
612 }
613
614 let layout = layout.ok_or_else(|| anyhow!("Timeout waiting for layout response"))?;
615 log::info!("Layout received with {} entries.", layout.len());
616
617 // Set up channels and background tasks for shared WebSocket access.
618 // This allows both the control loop (gm.write) and CommandClient (IPC
619 // commands) to share the write half, while routing incoming responses
620 // to the CommandClient.
621 let (ws_write_tx, mut ws_write_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
622 let (response_tx, response_rx) = tokio::sync::mpsc::unbounded_channel::<CommandMessage>();
623
624 // Background task: WS write loop
625 // Reads serialized messages from ws_write_rx and sends them over the WebSocket.
626 tokio::spawn(async move {
627 while let Some(msg_json) = ws_write_rx.recv().await {
628 if let Err(e) = write.send(Message::Text(msg_json)).await {
629 log::error!("WebSocket write error: {}", e);
630 break;
631 }
632 }
633 });
634
635 // Background task: WS read loop
636 // Reads all incoming WebSocket messages. Routes Response messages to
637 // response_tx for the CommandClient; ignores broadcasts and others.
638 tokio::spawn(async move {
639 while let Some(result) = read.next().await {
640 match result {
641 Ok(Message::Text(text)) => {
642 if let Ok(msg) = serde_json::from_str::<CommandMessage>(&text) {
643 if msg.message_type == MessageType::Response {
644 if response_tx.send(msg).is_err() {
645 break; // receiver dropped
646 }
647 }
648 // Broadcasts and other message types are ignored
649 }
650 }
651 Ok(Message::Close(_)) => {
652 log::info!("WebSocket closed by server");
653 break;
654 }
655 Err(e) => {
656 log::error!("WebSocket read error: {}", e);
657 break;
658 }
659 _ => {} // Ping/Pong/Binary - ignore
660 }
661 }
662 });
663
664 // Construct CommandClient — owned by the runner, passed to the
665 // program via TickContext each cycle.
666 let mut command_client = CommandClient::new(ws_write_tx.clone(), response_rx);
667
668 // 2. Find Signal Offsets
669 let tick_offset = self.find_offset(&layout, &self.config.tick_signal_name)?;
670 let busy_offset = if let Some(name) = &self.config.busy_signal_name {
671 Some(self.find_offset(&layout, name)?)
672 } else {
673 None
674 };
675
676 // 4. Open Shared Memory
677 let shmem = ShmemConf::new().os_id(&self.config.shm_name).open()?;
678 let base_ptr = shmem.as_ptr();
679 log::info!("Shared Memory '{}' mapped.", self.config.shm_name);
680
681 // 5. Setup Pointers
682 // SAFETY: We trust the server's layout matches the generated GlobalMemory struct.
683 let gm = unsafe { &mut *(base_ptr as *mut P::Memory) };
684
685 // Get tick event from shared memory
686 log::info!("Setting up tick event at offset {} (base_ptr: {:p})", tick_offset, base_ptr);
687 let (tick_event, _) = unsafe {
688 Event::from_existing(base_ptr.add(tick_offset))
689 }.map_err(|e| anyhow!("Failed to open tick event: {:?}", e))?;
690 log::info!("Tick event ready");
691
692 // Busy signal event (optional)
693 let busy_event = busy_offset.map(|offset| {
694 unsafe { Event::from_existing(base_ptr.add(offset)) }
695 .map(|(event, _)| event)
696 .ok()
697 }).flatten();
698
699 // 6. Initialize local memory buffer and user program
700 // We use a local copy for the control loop to ensure:
701 // - Consistent snapshot of inputs at start of cycle
702 // - Atomic commit of outputs at end of cycle
703 // - Proper memory barriers for cross-process visibility
704 let mut local_mem: P::Memory = unsafe { std::ptr::read_volatile(gm) };
705 let mut prev_mem: P::Memory = local_mem; // Snapshot for change detection
706
707 fence(Ordering::Acquire); // Ensure we see all prior writes from other processes
708
709 self.program.initialize(&mut local_mem);
710
711 // Write back any changes from initialize
712 fence(Ordering::Release);
713 unsafe { std::ptr::write_volatile(gm, local_mem) };
714
715 // Set up signal handler for graceful shutdown
716 let running = Arc::new(AtomicBool::new(true));
717 let r = running.clone();
718
719 // Only set handler if not already set
720 if let Err(e) = ctrlc::set_handler(move || {
721 r.store(false, Ordering::SeqCst);
722 }) {
723 log::warn!("Failed to set signal handler: {}", e);
724 }
725
726 log::info!("Entering Control Loop - waiting for first tick...");
727 let mut cycle_count: u64 = 0;
728 let mut consecutive_timeouts: u32 = 0;
729
730 while running.load(Ordering::SeqCst) {
731 // Wait for Tick - Event-based synchronization
732 // Use a timeout (1s) to allow checking the running flag periodically
733 match tick_event.wait(Timeout::Val(Duration::from_secs(1))) {
734 Ok(_) => {
735 consecutive_timeouts = 0;
736 },
737 Err(e) => {
738 // Check for timeout
739 let err_str = format!("{:?}", e);
740 if err_str.contains("Timeout") {
741 consecutive_timeouts += 1;
742 if consecutive_timeouts == 10 {
743 log::error!(
744 "TICK STALL: {} consecutive timeouts! cycle={} pending={} responses={} fds={} rss_kb={}",
745 consecutive_timeouts,
746 cycle_count,
747 command_client.pending_count(),
748 command_client.response_count(),
749 diagnostics::count_open_fds(),
750 diagnostics::get_rss_kb(),
751 );
752 }
753 if consecutive_timeouts > 10 && consecutive_timeouts % 60 == 0 {
754 log::error!(
755 "TICK STALL continues: {} consecutive timeouts, cycle={}",
756 consecutive_timeouts,
757 cycle_count,
758 );
759 }
760 continue;
761 }
762 return Err(anyhow!("Tick wait failed: {:?}", e));
763 }
764 }
765
766 if !running.load(Ordering::SeqCst) {
767 log::info!("Shutdown signal received, exiting control loop.");
768 break;
769 }
770
771 cycle_count += 1;
772 if cycle_count == 1 {
773 log::info!("First tick received!");
774 }
775
776 // // Periodic diagnostics (every 30s at 100 Hz)
777 // if cycle_count % 3000 == 0 {
778 // log::info!(
779 // "DIAG cycle={} pending={} responses={} fds={} rss_kb={}",
780 // cycle_count,
781 // command_client.pending_count(),
782 // command_client.response_count(),
783 // diagnostics::count_open_fds(),
784 // diagnostics::get_rss_kb(),
785 // );
786 // }
787
788 // === INPUT PHASE ===
789 // Read all variables from shared memory into local buffer.
790 // This gives us a consistent snapshot of inputs for this cycle.
791 // Acquire fence ensures we see all writes from other processes (server, modules).
792 local_mem = unsafe { std::ptr::read_volatile(gm) };
793
794 // Update prev_mem before execution to track changes made IN THIS CYCLE
795 // Actually, we want to know what changed in SHM relative to what we last knew,
796 // OR what WE changed relative to what we read?
797 // The user wants "writes on shared variables" to be broadcast.
798 // Typically outputs.
799 // If inputs changed (from other source), broadcasting them again is fine too.
800 // Let's capture state BEFORE execution (which is what we just read from SHM).
801 prev_mem = local_mem;
802
803 fence(Ordering::Acquire);
804
805 // Unpack bit-mapped variables from their source words.
806 local_mem.unpack_bits();
807
808 // === EXECUTE PHASE ===
809 // Poll IPC responses so they are available during process_tick.
810 command_client.poll();
811
812 // Execute user logic on the local copy.
813 // All reads/writes during process_tick operate on local_mem.
814 let mut ctx = TickContext {
815 gm: &mut local_mem,
816 client: &mut command_client,
817 cycle: cycle_count,
818 };
819 self.program.process_tick(&mut ctx);
820
821 // === OUTPUT PHASE ===
822 // Pack bit-mapped variables back into their source words.
823 local_mem.pack_bits();
824
825 // Write all variables from local buffer back to shared memory.
826 // Release fence ensures our writes are visible to other processes.
827 fence(Ordering::Release);
828 unsafe { std::ptr::write_volatile(gm, local_mem) };
829
830 // === CHANGE DETECTION & NOTIFICATION ===
831 let changes = local_mem.get_changes(&prev_mem);
832 if !changes.is_empty() {
833 // Construct bulk write message
834 let mut data_map = serde_json::Map::new();
835 for (key, val) in changes {
836 data_map.insert(key.to_string(), val);
837 }
838
839 let msg = CommandMessage::request("gm.write", serde_json::Value::Object(data_map));
840 let msg_json = serde_json::to_string(&msg).unwrap_or_default();
841
842 // Send via the shared write channel (non-blocking)
843 if let Err(e) = ws_write_tx.send(msg_json) {
844 log::error!("Failed to send updates: {}", e);
845 }
846 }
847
848 // Signal Busy/Done event
849 if let Some(ref busy_ev) = busy_event {
850 let _ = busy_ev.set(EventState::Signaled);
851 }
852 }
853
854 Ok(())
855 })
856 }
857
858 fn find_offset(&self, layout: &HashMap<String, serde_json::Value>, name: &str) -> Result<usize> {
859 let info = layout.get(name).ok_or_else(|| anyhow!("Signal '{}' not found in layout", name))?;
860 info.get("offset")
861 .and_then(|v| v.as_u64())
862 .map(|v| v as usize)
863 .ok_or_else(|| anyhow!("Invalid offset for '{}'", name))
864 }
865}
866
867/// Generates the standard `main` function for a control program.
868///
869/// This macro reduces boilerplate by creating a properly configured `main`
870/// function that initializes and runs your control program.
871///
872/// # Arguments
873///
874/// * `$prog_type` - The type of your control program (must implement [`ControlProgram`])
875/// * `$shm_name` - The shared memory segment name (string literal)
876/// * `$tick_signal` - The tick signal name in shared memory (string literal)
877///
878/// # Example
879///
880/// ```ignore
881/// mod gm;
882/// use gm::GlobalMemory;
883///
884/// pub struct MyProgram;
885///
886/// impl MyProgram {
887/// pub fn new() -> Self { Self }
888/// }
889///
890/// impl autocore_std::ControlProgram for MyProgram {
891/// type Memory = GlobalMemory;
892///
893/// fn process_tick(&mut self, ctx: &mut autocore_std::TickContext<Self::Memory>) {
894/// // Your logic here
895/// }
896/// }
897///
898/// // This generates the main function
899/// autocore_std::autocore_main!(MyProgram, "my_project_shm", "tick");
900/// ```
901///
902/// # Generated Code
903///
904/// The macro expands to:
905///
906/// ```ignore
907/// fn main() -> anyhow::Result<()> {
908/// let config = autocore_std::RunnerConfig {
909/// server_host: "127.0.0.1".to_string(),
910/// ws_port: autocore_std::DEFAULT_WS_PORT,
911/// module_name: "control".to_string(),
912/// shm_name: "my_project_shm".to_string(),
913/// tick_signal_name: "tick".to_string(),
914/// busy_signal_name: None,
915/// log_level: log::LevelFilter::Info,
916/// log_udp_port: autocore_std::logger::DEFAULT_LOG_UDP_PORT,
917/// };
918///
919/// autocore_std::ControlRunner::new(MyProgram::new())
920/// .config(config)
921/// .run()
922/// }
923/// ```
924#[macro_export]
925macro_rules! autocore_main {
926 ($prog_type:ty, $shm_name:expr, $tick_signal:expr) => {
927 fn main() -> anyhow::Result<()> {
928 let config = autocore_std::RunnerConfig {
929 server_host: "127.0.0.1".to_string(),
930 ws_port: autocore_std::DEFAULT_WS_PORT,
931 module_name: "control".to_string(),
932 shm_name: $shm_name.to_string(),
933 tick_signal_name: $tick_signal.to_string(),
934 busy_signal_name: None,
935 log_level: log::LevelFilter::Info,
936 log_udp_port: autocore_std::logger::DEFAULT_LOG_UDP_PORT,
937 };
938
939 autocore_std::ControlRunner::new(<$prog_type>::new())
940 .config(config)
941 .run()
942 }
943 };
944}
945