autocore-std 3.3.21

Standard library for AutoCore control programs - shared memory, IPC, and logging utilities
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
//! # AutoCore Standard Library
//!
//! The standard library for writing AutoCore control programs. This crate provides
//! everything you need to build real-time control applications that integrate with
//! the AutoCore server ecosystem.
//!
//! ## Overview
//!
//! AutoCore control programs run as separate processes that communicate with the
//! autocore-server via shared memory and IPC. This library handles all the low-level
//! details, allowing you to focus on your control logic.
//!
//! ```text
//! ┌─────────────────────────┐     ┌─────────────────────────┐
//! │   autocore-server       │     │   Your Control Program  │
//! │                         │     │                         │
//! │  ┌─────────────────┐    │     │  ┌─────────────────┐    │
//! │  │ Shared Memory   │◄───┼─────┼──│ ControlRunner   │    │
//! │  │ (GlobalMemory)  │    │     │  │                 │    │
//! │  └─────────────────┘    │     │  │ ┌─────────────┐ │    │
//! │                         │     │  │ │ Your Logic  │ │    │
//! │  ┌─────────────────┐    │     │  │ └─────────────┘ │    │
//! │  │ Tick Signal     │────┼─────┼──│                 │    │
//! │  └─────────────────┘    │     │  └─────────────────┘    │
//! └─────────────────────────┘     └─────────────────────────┘
//! ```
//!
//! ## Quick Start
//!
//! 1. Create a new control project using `acctl`:
//!    ```bash
//!    acctl clone <server-ip> <project-name>
//!    ```
//!
//! 2. Implement the [`ControlProgram`] trait:
//!    ```ignore
//!    use autocore_std::{ControlProgram, TickContext};
//!    use autocore_std::fb::RTrig;
//!
//!    // GlobalMemory is generated from your project.json
//!    mod gm;
//!    use gm::GlobalMemory;
//!
//!    pub struct MyProgram {
//!        start_button: RTrig,
//!    }
//!
//!    impl MyProgram {
//!        pub fn new() -> Self {
//!            Self {
//!                start_button: RTrig::new(),
//!            }
//!        }
//!    }
//!
//!    impl ControlProgram for MyProgram {
//!        type Memory = GlobalMemory;
//!
//!        fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
//!            // Detect rising edge on start button
//!            if self.start_button.call(ctx.gm.inputs.start_button) {
//!                ctx.gm.outputs.motor_running = true;
//!                autocore_std::log::info!("Motor started!");
//!            }
//!        }
//!    }
//!    ```
//!
//! 3. Use the [`autocore_main!`] macro for the entry point:
//!    ```ignore
//!    autocore_std::autocore_main!(MyProgram, "my_project_shm", "tick");
//!    ```
//!
//! ## Function Blocks (IEC 61131-3 Inspired)
//!
//! This library includes standard function blocks commonly used in PLC programming:
//!
//! - [`fb::RTrig`] - Rising edge detector (false→true transition)
//! - [`fb::FTrig`] - Falling edge detector (true→false transition)
//! - [`fb::Ton`] - Timer On Delay (output after delay)
//! - [`fb::BitResetOnDelay`] - Resets a boolean after it has been true for a duration
//! - [`fb::SimpleTimer`] - Simple one-shot timer (NOT IEC 61131-3, for imperative use)
//! - [`fb::StateMachine`] - State machine helper with automatic timer management
//! - [`fb::RunningAverage`] - Accumulates values and computes their arithmetic mean
//! - [`fb::Beeper`] - Audible beeper controller with configurable beep sequences
//! - [`fb::Heartbeat`] - Monitors a remote heartbeat counter for connection loss
//!
//! ### Example: Edge Detection
//!
//! ```
//! use autocore_std::fb::RTrig;
//!
//! let mut trigger = RTrig::new();
//!
//! // First call with false - no edge
//! assert_eq!(trigger.call(false), false);
//!
//! // Rising edge detected!
//! assert_eq!(trigger.call(true), true);
//!
//! // Still true, but no edge (already high)
//! assert_eq!(trigger.call(true), false);
//!
//! // Back to false
//! assert_eq!(trigger.call(false), false);
//!
//! // Another rising edge
//! assert_eq!(trigger.call(true), true);
//! ```
//!
//! ### Example: Timer
//!
//! ```
//! use autocore_std::fb::Ton;
//! use std::time::Duration;
//!
//! let mut timer = Ton::new();
//! let delay = Duration::from_millis(100);
//!
//! // Timer not enabled - output is false
//! assert_eq!(timer.call(false, delay), false);
//!
//! // Enable timer - starts counting
//! assert_eq!(timer.call(true, delay), false);
//!
//! // Still counting...
//! std::thread::sleep(Duration::from_millis(50));
//! assert_eq!(timer.call(true, delay), false);
//! assert!(timer.et < delay); // Elapsed time < preset
//!
//! // After delay elapsed
//! std::thread::sleep(Duration::from_millis(60));
//! assert_eq!(timer.call(true, delay), true); // Output is now true!
//! ```
//!
//! ## Logging
//!
//! Control programs can send log messages to the autocore-server for display in the
//! web console. Logging is handled automatically when using [`ControlRunner`].
//!
//! ```ignore
//! use autocore_std::log;
//!
//! log::trace!("Detailed trace message");
//! log::debug!("Debug information");
//! log::info!("Normal operation message");
//! log::warn!("Warning condition detected");
//! log::error!("Error occurred!");
//! ```
//!
//! See the [`logger`] module for advanced configuration.
//!
//! ## Memory Synchronization
//!
//! The [`ControlRunner`] handles all shared memory synchronization automatically:
//!
//! 1. **Wait for tick** - Blocks until the server signals a new cycle
//! 2. **Read inputs** - Copies shared memory to local buffer (atomic snapshot)
//! 3. **Execute logic** - Your `process_tick` runs on the local buffer
//! 4. **Write outputs** - Copies local buffer back to shared memory
//!
//! This ensures your control logic always sees a consistent view of the data,
//! even when other processes are modifying shared memory.

#![warn(missing_docs)]
#![warn(rustdoc::missing_crate_level_docs)]
#![doc(html_root_url = "https://docs.rs/autocore-std/3.3.0")]

use anyhow::{anyhow, Result};
use futures_util::{SinkExt, StreamExt};
use log::LevelFilter;
use mechutil::ipc::{CommandMessage, MessageType};
use raw_sync::events::{Event, EventInit, EventState};
use raw_sync::Timeout;
use shared_memory::ShmemConf;
use std::collections::HashMap;
use std::sync::atomic::{fence, Ordering, AtomicBool};
use std::sync::Arc;
use std::time::Duration;
use tokio_tungstenite::{connect_async, tungstenite::Message};

/// UDP logger for sending log messages to autocore-server.
///
/// This module provides a non-blocking logger implementation that sends log messages
/// via UDP to the autocore-server. Messages are batched and sent asynchronously to
/// avoid impacting the control loop timing.
///
/// # Example
///
/// ```ignore
/// use autocore_std::logger;
/// use log::LevelFilter;
///
/// // Initialize the logger (done automatically by ControlRunner)
/// logger::init_udp_logger("127.0.0.1", 39101, LevelFilter::Info, "control")?;
///
/// // Now you can use the log macros
/// log::info!("System initialized");
/// ```
pub mod logger;

// Re-export log crate for convenience - control programs can use autocore_std::log::info!() etc.
pub use log;

/// Function blocks for control programs (IEC 61131-3 inspired).
pub mod fb;

/// Interface protocols for communication between control programs and external sources.
pub mod iface;

/// Client for sending IPC commands to external modules via WebSocket.
pub mod command_client;
pub use command_client::CommandClient;

/// EtherCAT utilities (SDO client, etc.).
pub mod ethercat;

/// CiA 402 motion control: axis abstraction, traits, and types.
pub mod motion;

/// Shared memory utilities for external modules.
pub mod shm;

/// Lightweight process diagnostics (FD count, RSS).
pub mod diagnostics;

/// Banner Engineering device helpers (WLS15 IO-Link light strip, etc.).
pub mod banner;

/// Fixed-length string type for shared memory variables.
pub mod fixed_string;
pub use fixed_string::FixedString;

// ============================================================================
// Core Framework
// ============================================================================

/// Marker trait for generated GlobalMemory structs.
///
/// This trait is implemented by the auto-generated `GlobalMemory` struct
/// that represents the shared memory layout. It serves as a marker for
/// type safety in the control framework.
///
/// You don't need to implement this trait yourself - it's automatically
/// implemented by the code generator.
pub trait AutoCoreMemory {}

/// Trait for detecting changes in memory structures.
pub trait ChangeTracker {
    /// Compare self with a previous state and return a list of changed fields.
    /// Returns a vector of (field_name, new_value).
    fn get_changes(&self, prev: &Self) -> Vec<(&'static str, serde_json::Value)>;

    /// Unpack bit-mapped variables from their source words.
    /// Called automatically after reading shared memory, before `process_tick`.
    /// Auto-generated by codegen when bit-mapped variables exist; default is no-op.
    fn unpack_bits(&mut self) {}

    /// Pack bit-mapped variables back into their source words.
    /// Called automatically after `process_tick`, before writing shared memory.
    /// Only packs sources where at least one mapped bool changed since `pre_tick`.
    /// Auto-generated by codegen when bit-mapped variables exist; default is no-op.
    fn pack_bits(&mut self, _pre_tick: &Self) {}
}

/// Per-tick context passed to the control program by the framework.
///
/// `TickContext` bundles all per-cycle data into a single struct so that the
/// [`ControlProgram::process_tick`] signature stays stable as new fields are
/// added in the future (e.g., delta time, diagnostics).
///
/// The framework constructs a fresh `TickContext` each cycle, calls
/// [`CommandClient::poll`] before handing it to the program, and writes
/// the memory back to shared memory after `process_tick` returns.
pub struct TickContext<'a, M> {
    /// Mutable reference to the local shared memory copy.
    pub gm: &'a mut M,
    /// IPC command client for communicating with external modules.
    pub client: &'a mut CommandClient,
    /// Current cycle number (starts at 1, increments each tick).
    pub cycle: u64,
}

/// The trait that defines a control program's logic.
///
/// Implement this trait to create your control program. The associated `Memory`
/// type should be the generated `GlobalMemory` struct from your project.
///
/// # Memory Type Requirements
///
/// The `Memory` type must implement `Copy` to allow efficient synchronization
/// between shared memory and local buffers. This is automatically satisfied
/// by the generated `GlobalMemory` struct.
///
/// # Lifecycle
///
/// 1. `initialize` is called once at startup
/// 2. `process_tick` is called repeatedly in the control loop with a
///    [`TickContext`] that provides shared memory, the IPC client, and the
///    current cycle number.
///
/// # Example
///
/// ```ignore
/// use autocore_std::{ControlProgram, TickContext};
///
/// mod gm;
/// use gm::GlobalMemory;
///
/// pub struct MyController {
///     cycle_counter: u64,
/// }
///
/// impl MyController {
///     pub fn new() -> Self {
///         Self { cycle_counter: 0 }
///     }
/// }
///
/// impl ControlProgram for MyController {
///     type Memory = GlobalMemory;
///
///     fn initialize(&mut self, mem: &mut GlobalMemory) {
///         // Set initial output states
///         mem.outputs.ready = true;
///         log::info!("Controller initialized");
///     }
///
///     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
///         self.cycle_counter = ctx.cycle;
///
///         // Your control logic here
///         if ctx.gm.inputs.start && !ctx.gm.inputs.estop {
///             ctx.gm.outputs.running = true;
///         }
///     }
/// }
/// ```
pub trait ControlProgram {
    /// The shared memory structure type (usually the generated `GlobalMemory`).
    ///
    /// Must implement `Copy` to allow efficient memory synchronization.
    type Memory: Copy + ChangeTracker;

    /// Called once when the control program starts.
    ///
    /// Use this to initialize output states, reset counters, or perform
    /// any one-time setup. The default implementation does nothing.
    ///
    /// # Arguments
    ///
    /// * `mem` - Mutable reference to the shared memory. Changes are written
    ///           back to shared memory after this method returns.
    fn initialize(&mut self, _mem: &mut Self::Memory) {}

    /// The main control loop - called once per scan cycle.
    ///
    /// This is where your control logic lives. Read inputs from `ctx.gm`,
    /// perform calculations, and write outputs back to `ctx.gm`. Use
    /// `ctx.client` for IPC commands and `ctx.cycle` for the current cycle
    /// number.
    ///
    /// The framework calls [`CommandClient::poll`] before each invocation,
    /// so incoming responses are already buffered when your code runs.
    ///
    /// # Arguments
    ///
    /// * `ctx` - A [`TickContext`] containing the local shared memory copy,
    ///           the IPC command client, and the current cycle number.
    ///
    /// # Timing
    ///
    /// This method should complete within the scan cycle time. Long-running
    /// operations will cause cycle overruns.
    fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>);
}

/// Configuration for the [`ControlRunner`].
///
/// Specifies connection parameters, shared memory names, and logging settings.
/// Use [`Default::default()`] for typical configurations.
///
/// # Example
///
/// ```
/// use autocore_std::RunnerConfig;
/// use log::LevelFilter;
///
/// let config = RunnerConfig {
///     server_host: "192.168.1.100".to_string(),
///     module_name: "my_controller".to_string(),
///     shm_name: "my_project_shm".to_string(),
///     tick_signal_name: "tick".to_string(),
///     busy_signal_name: Some("busy".to_string()),
///     log_level: LevelFilter::Debug,
///     ..Default::default()
/// };
/// ```
#[derive(Debug, Clone)]
pub struct RunnerConfig {
    /// Server host address (default: "127.0.0.1")
    pub server_host: String,
    /// WebSocket port for commands (default: 11969)
    pub ws_port: u16,
    /// Module name for identification (default: "control")
    pub module_name: String,
    /// Shared memory segment name (must match server configuration)
    pub shm_name: String,
    /// Name of the tick signal in shared memory (triggers each scan cycle)
    pub tick_signal_name: String,
    /// Optional name of the busy signal (set when cycle completes)
    pub busy_signal_name: Option<String>,
    /// Minimum log level to send to the server (default: Info)
    pub log_level: LevelFilter,
    /// UDP port for sending logs to the server (default: 39101)
    pub log_udp_port: u16,
}

/// Default WebSocket port for autocore-server
pub const DEFAULT_WS_PORT: u16 = 11969;

impl Default for RunnerConfig {
    fn default() -> Self {
        Self {
            server_host: "127.0.0.1".to_string(),
            ws_port: DEFAULT_WS_PORT,
            module_name: "control".to_string(),
            shm_name: "autocore_cyclic".to_string(),
            tick_signal_name: "tick".to_string(),
            busy_signal_name: None,
            log_level: LevelFilter::Info,
            log_udp_port: logger::DEFAULT_LOG_UDP_PORT,
        }
    }
}


/// The main execution engine for control programs.
///
/// `ControlRunner` handles all the infrastructure required to run a control program:
///
/// - Reading memory layout from the server's layout file
/// - Opening and mapping shared memory
/// - Setting up synchronization signals
/// - Running the real-time control loop
/// - Sending log messages to the server
///
/// # Usage
///
/// ```ignore
/// use autocore_std::{ControlRunner, RunnerConfig};
///
/// let config = RunnerConfig {
///     shm_name: "my_project_shm".to_string(),
///     tick_signal_name: "tick".to_string(),
///     ..Default::default()
/// };
///
/// ControlRunner::new(MyProgram::new())
///     .config(config)
///     .run()?;  // Blocks forever
/// ```
///
/// # Control Loop
///
/// The runner executes a synchronous control loop:
///
/// 1. **Wait** - Blocks until the tick signal is set by the server
/// 2. **Read** - Copies shared memory to a local buffer (acquire barrier)
/// 3. **Execute** - Calls your `process_tick` method
/// 4. **Write** - Copies local buffer back to shared memory (release barrier)
/// 5. **Signal** - Sets the busy signal (if configured) to indicate completion
///
/// This ensures your code always sees a consistent snapshot of the data
/// and that your writes are atomically visible to other processes.
pub struct ControlRunner<P: ControlProgram> {
    config: RunnerConfig,
    program: P,
}

impl<P: ControlProgram> ControlRunner<P> {
    /// Creates a new runner for the given control program.
    ///
    /// Uses default configuration. Call [`.config()`](Self::config) to customize.
    ///
    /// # Arguments
    ///
    /// * `program` - Your control program instance
    ///
    /// # Example
    ///
    /// ```ignore
    /// let runner = ControlRunner::new(MyProgram::new());
    /// ```
    pub fn new(program: P) -> Self {
        Self {
            config: RunnerConfig::default(),
            program,
        }
    }

    /// Sets the configuration for this runner.
    ///
    /// # Arguments
    ///
    /// * `config` - The configuration to use
    ///
    /// # Example
    ///
    /// ```ignore
    /// ControlRunner::new(MyProgram::new())
    ///     .config(RunnerConfig {
    ///         shm_name: "custom_shm".to_string(),
    ///         ..Default::default()
    ///     })
    ///     .run()?;
    /// ```
    pub fn config(mut self, config: RunnerConfig) -> Self {
        self.config = config;
        self
    }

    /// Starts the control loop.
    ///
    /// This method blocks indefinitely, running the control loop until
    /// an error occurs or the process is terminated.
    ///
    /// # Returns
    ///
    /// Returns `Ok(())` only if the loop exits cleanly (which typically
    /// doesn't happen). Returns an error if:
    ///
    /// - IPC connection fails
    /// - Shared memory cannot be opened
    /// - Signal offsets cannot be found
    /// - A critical error occurs during execution
    ///
    /// # Example
    ///
    /// ```ignore
    /// fn main() -> anyhow::Result<()> {
    ///     ControlRunner::new(MyProgram::new())
    ///         .config(config)
    ///         .run()
    /// }
    /// ```
    pub fn run(mut self) -> Result<()> {
        // Initialize UDP logger FIRST (before any log statements)
        if let Err(e) = logger::init_udp_logger(
            &self.config.server_host,
            self.config.log_udp_port,
            self.config.log_level,
            "control",
        ) {
            eprintln!("Warning: Failed to initialize UDP logger: {}", e);
            // Continue anyway - logging will just go nowhere
        }

        // Multi-threaded runtime so spawned WS read/write tasks can run
        // alongside the synchronous control loop.
        let rt = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(2)
            .enable_all()
            .build()?;

        rt.block_on(async {
            log::info!("AutoCore Control Runner Starting...");

            // 1. Connect to server via WebSocket and get layout
            let ws_url = format!("ws://{}:{}/ws/", self.config.server_host, self.config.ws_port);
            log::info!("Connecting to server at {}", ws_url);

            let (ws_stream, _) = connect_async(&ws_url).await
                .map_err(|e| anyhow!("Failed to connect to server at {}: {}", ws_url, e))?;

            let (mut write, mut read) = ws_stream.split();

            // Send gm.get_layout request
            let request = CommandMessage::request("gm.get_layout", serde_json::Value::Null);
            let transaction_id = request.transaction_id;
            let request_json = serde_json::to_string(&request)?;

            write.send(Message::Text(request_json)).await
                .map_err(|e| anyhow!("Failed to send layout request: {}", e))?;

            // Wait for response with matching transaction_id
            let timeout = Duration::from_secs(10);
            let start = std::time::Instant::now();
            let mut layout: Option<HashMap<String, serde_json::Value>> = None;

            while start.elapsed() < timeout {
                match tokio::time::timeout(Duration::from_secs(1), read.next()).await {
                    Ok(Some(Ok(Message::Text(text)))) => {
                        if let Ok(response) = serde_json::from_str::<CommandMessage>(&text) {
                            if response.transaction_id == transaction_id {
                                if !response.success {
                                    return Err(anyhow!("Server error: {}", response.error_message));
                                }
                                layout = Some(serde_json::from_value(response.data)?);
                                break;
                            }
                            // Skip broadcasts and other messages
                            if response.message_type == MessageType::Broadcast {
                                continue;
                            }
                        }
                    }
                    Ok(Some(Ok(_))) => continue,
                    Ok(Some(Err(e))) => return Err(anyhow!("WebSocket error: {}", e)),
                    Ok(None) => return Err(anyhow!("Server closed connection")),
                    Err(_) => continue, // Timeout on single read, keep trying
                }
            }

            let layout = layout.ok_or_else(|| anyhow!("Timeout waiting for layout response"))?;
            log::info!("Layout received with {} entries.", layout.len());

            // Set up channels and background tasks for shared WebSocket access.
            // This allows both the control loop (gm.write) and CommandClient (IPC
            // commands) to share the write half, while routing incoming responses
            // to the CommandClient.
            let (ws_write_tx, mut ws_write_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
            let (response_tx, response_rx) = tokio::sync::mpsc::unbounded_channel::<CommandMessage>();

            // Background task: WS write loop
            // Reads serialized messages from ws_write_rx and sends them over the WebSocket.
            tokio::spawn(async move {
                while let Some(msg_json) = ws_write_rx.recv().await {
                    if let Err(e) = write.send(Message::Text(msg_json)).await {
                        log::error!("WebSocket write error: {}", e);
                        break;
                    }
                }
            });

            // Background task: WS read loop
            // Reads all incoming WebSocket messages. Routes Response messages to
            // response_tx for the CommandClient; ignores broadcasts and others.
            tokio::spawn(async move {
                while let Some(result) = read.next().await {
                    match result {
                        Ok(Message::Text(text)) => {
                            if let Ok(msg) = serde_json::from_str::<CommandMessage>(&text) {
                                if msg.message_type == MessageType::Response {
                                    if response_tx.send(msg).is_err() {
                                        break; // receiver dropped
                                    }
                                }
                                // Broadcasts and other message types are ignored
                            }
                        }
                        Ok(Message::Close(_)) => {
                            log::info!("WebSocket closed by server");
                            break;
                        }
                        Err(e) => {
                            log::error!("WebSocket read error: {}", e);
                            break;
                        }
                        _ => {} // Ping/Pong/Binary - ignore
                    }
                }
            });

            // Construct CommandClient — owned by the runner, passed to the
            // program via TickContext each cycle.
            let mut command_client = CommandClient::new(ws_write_tx.clone(), response_rx);

            // 2. Find Signal Offsets
            let tick_offset = self.find_offset(&layout, &self.config.tick_signal_name)?;
            let busy_offset = if let Some(name) = &self.config.busy_signal_name {
                Some(self.find_offset(&layout, name)?)
            } else {
                None
            };

            // 4. Open Shared Memory
            let shmem = ShmemConf::new().os_id(&self.config.shm_name).open()?;
            let base_ptr = shmem.as_ptr();
            log::info!("Shared Memory '{}' mapped.", self.config.shm_name);

            // 5. Setup Pointers
            // SAFETY: We trust the server's layout matches the generated GlobalMemory struct.
            let gm = unsafe { &mut *(base_ptr as *mut P::Memory) };

            // Get tick event from shared memory
            log::info!("Setting up tick event at offset {} (base_ptr: {:p})", tick_offset, base_ptr);
            let (tick_event, _) = unsafe {
                Event::from_existing(base_ptr.add(tick_offset))
            }.map_err(|e| anyhow!("Failed to open tick event: {:?}", e))?;
            log::info!("Tick event ready");

            // Busy signal event (optional)
            let busy_event = busy_offset.map(|offset| {
                unsafe { Event::from_existing(base_ptr.add(offset)) }
                    .map(|(event, _)| event)
                    .ok()
            }).flatten();

            // 6. Initialize local memory buffer and user program
            // We use a local copy for the control loop to ensure:
            // - Consistent snapshot of inputs at start of cycle
            // - Atomic commit of outputs at end of cycle
            // - Proper memory barriers for cross-process visibility
            let mut local_mem: P::Memory = unsafe { std::ptr::read_volatile(gm) };
            let mut prev_mem: P::Memory = local_mem; // Snapshot for change detection

            fence(Ordering::Acquire); // Ensure we see all prior writes from other processes

            self.program.initialize(&mut local_mem);

            // Write back any changes from initialize
            fence(Ordering::Release);
            unsafe { std::ptr::write_volatile(gm, local_mem) };

            // Set up signal handler for graceful shutdown
            let running = Arc::new(AtomicBool::new(true));
            let r = running.clone();
            
            // Only set handler if not already set
            if let Err(e) = ctrlc::set_handler(move || {
                r.store(false, Ordering::SeqCst);
            }) {
                log::warn!("Failed to set signal handler: {}", e);
            }

            log::info!("Entering Control Loop - waiting for first tick...");
            let mut cycle_count: u64 = 0;
            let mut consecutive_timeouts: u32 = 0;

            while running.load(Ordering::SeqCst) {
                // Wait for Tick - Event-based synchronization
                // Use a timeout (1s) to allow checking the running flag periodically
                match tick_event.wait(Timeout::Val(Duration::from_secs(1))) {
                    Ok(_) => {
                        consecutive_timeouts = 0;
                    },
                    Err(e) => {
                        // Check for timeout
                        let err_str = format!("{:?}", e);
                        if err_str.contains("Timeout") {
                            consecutive_timeouts += 1;
                            if consecutive_timeouts == 10 {
                                log::error!(
                                    "TICK STALL: {} consecutive timeouts! cycle={} pending={} responses={} fds={} rss_kb={}",
                                    consecutive_timeouts,
                                    cycle_count,
                                    command_client.pending_count(),
                                    command_client.response_count(),
                                    diagnostics::count_open_fds(),
                                    diagnostics::get_rss_kb(),
                                );
                            }
                            if consecutive_timeouts > 10 && consecutive_timeouts % 60 == 0 {
                                log::error!(
                                    "TICK STALL continues: {} consecutive timeouts, cycle={}",
                                    consecutive_timeouts,
                                    cycle_count,
                                );
                            }
                            continue;
                        }
                        return Err(anyhow!("Tick wait failed: {:?}", e));
                    }
                }

                if !running.load(Ordering::SeqCst) {
                    log::info!("Shutdown signal received, exiting control loop.");
                    break;
                }

                cycle_count += 1;
                if cycle_count == 1 {
                    log::info!("First tick received!");
                }

                // // Periodic diagnostics (every 30s at 100 Hz)
                // if cycle_count % 3000 == 0 {
                //     log::info!(
                //         "DIAG cycle={} pending={} responses={} fds={} rss_kb={}",
                //         cycle_count,
                //         command_client.pending_count(),
                //         command_client.response_count(),
                //         diagnostics::count_open_fds(),
                //         diagnostics::get_rss_kb(),
                //     );
                // }

                // === INPUT PHASE ===
                // Read all variables from shared memory into local buffer.
                // This gives us a consistent snapshot of inputs for this cycle.
                // Acquire fence ensures we see all writes from other processes (server, modules).
                local_mem = unsafe { std::ptr::read_volatile(gm) };
                
                // Update prev_mem before execution to track changes made IN THIS CYCLE
                // Actually, we want to know what changed in SHM relative to what we last knew,
                // OR what WE changed relative to what we read?
                // The user wants "writes on shared variables" to be broadcast.
                // Typically outputs.
                // If inputs changed (from other source), broadcasting them again is fine too.
                // Let's capture state BEFORE execution (which is what we just read from SHM).
                prev_mem = local_mem;

                fence(Ordering::Acquire);

                // Unpack bit-mapped variables from their source words.
                local_mem.unpack_bits();

                // Snapshot after unpack — used by pack_bits to detect which
                // bools the control program actually changed.
                let pre_tick = local_mem;

                // === EXECUTE PHASE ===
                // Poll IPC responses so they are available during process_tick.
                command_client.poll();

                // Execute user logic on the local copy.
                // All reads/writes during process_tick operate on local_mem.
                let mut ctx = TickContext {
                    gm: &mut local_mem,
                    client: &mut command_client,
                    cycle: cycle_count,
                };
                self.program.process_tick(&mut ctx);

                // === OUTPUT PHASE ===
                // Pack bit-mapped variables back into their source words,
                // but only for sources where a mapped bool actually changed.
                local_mem.pack_bits(&pre_tick);

                // Write all variables from local buffer back to shared memory.
                // Release fence ensures our writes are visible to other processes.
                fence(Ordering::Release);
                unsafe { std::ptr::write_volatile(gm, local_mem) };

                // === CHANGE DETECTION & NOTIFICATION ===
                let changes = local_mem.get_changes(&prev_mem);
                if !changes.is_empty() {
                    // Construct bulk write message
                    let mut data_map = serde_json::Map::new();
                    for (key, val) in changes {
                        data_map.insert(key.to_string(), val);
                    }
                    
                    let msg = CommandMessage::request("gm.write", serde_json::Value::Object(data_map));
                    let msg_json = serde_json::to_string(&msg).unwrap_or_default();

                    // Send via the shared write channel (non-blocking)
                    if let Err(e) = ws_write_tx.send(msg_json) {
                        log::error!("Failed to send updates: {}", e);
                    }
                }

                // Signal Busy/Done event
                if let Some(ref busy_ev) = busy_event {
                    let _ = busy_ev.set(EventState::Signaled);
                }
            }

            Ok(())
        })
    }

    fn find_offset(&self, layout: &HashMap<String, serde_json::Value>, name: &str) -> Result<usize> {
        let info = layout.get(name).ok_or_else(|| anyhow!("Signal '{}' not found in layout", name))?;
        info.get("offset")
            .and_then(|v| v.as_u64())
            .map(|v| v as usize)
            .ok_or_else(|| anyhow!("Invalid offset for '{}'", name))
    }
}

/// Generates the standard `main` function for a control program.
///
/// This macro reduces boilerplate by creating a properly configured `main`
/// function that initializes and runs your control program.
///
/// # Arguments
///
/// * `$prog_type` - The type of your control program (must implement [`ControlProgram`])
/// * `$shm_name` - The shared memory segment name (string literal)
/// * `$tick_signal` - The tick signal name in shared memory (string literal)
///
/// # Example
///
/// ```ignore
/// mod gm;
/// use gm::GlobalMemory;
///
/// pub struct MyProgram;
///
/// impl MyProgram {
///     pub fn new() -> Self { Self }
/// }
///
/// impl autocore_std::ControlProgram for MyProgram {
///     type Memory = GlobalMemory;
///
///     fn process_tick(&mut self, ctx: &mut autocore_std::TickContext<Self::Memory>) {
///         // Your logic here
///     }
/// }
///
/// // This generates the main function
/// autocore_std::autocore_main!(MyProgram, "my_project_shm", "tick");
/// ```
///
/// # Generated Code
///
/// The macro expands to:
///
/// ```ignore
/// fn main() -> anyhow::Result<()> {
///     let config = autocore_std::RunnerConfig {
///         server_host: "127.0.0.1".to_string(),
///         ws_port: autocore_std::DEFAULT_WS_PORT,
///         module_name: "control".to_string(),
///         shm_name: "my_project_shm".to_string(),
///         tick_signal_name: "tick".to_string(),
///         busy_signal_name: None,
///         log_level: log::LevelFilter::Info,
///         log_udp_port: autocore_std::logger::DEFAULT_LOG_UDP_PORT,
///     };
///
///     autocore_std::ControlRunner::new(MyProgram::new())
///         .config(config)
///         .run()
/// }
/// ```
#[macro_export]
macro_rules! autocore_main {
    ($prog_type:ty, $shm_name:expr, $tick_signal:expr) => {
        fn main() -> anyhow::Result<()> {
            let config = autocore_std::RunnerConfig {
                server_host: "127.0.0.1".to_string(),
                ws_port: autocore_std::DEFAULT_WS_PORT,
                module_name: "control".to_string(),
                shm_name: $shm_name.to_string(),
                tick_signal_name: $tick_signal.to_string(),
                busy_signal_name: None,
                log_level: log::LevelFilter::Info,
                log_udp_port: autocore_std::logger::DEFAULT_LOG_UDP_PORT,
            };

            autocore_std::ControlRunner::new(<$prog_type>::new())
                .config(config)
                .run()
        }
    };
}