Skip to main content

memlink_shm/
control.rs

1//! Control region for shared memory coordination between daemon and clients.
2//! Contains head/tail pointers, sequence numbers, futex words, and state flags.
3
4use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU64, Ordering};
5
6pub const CONTROL_REGION_SIZE: usize = 4096;
7
8const DATA_SIZE: usize = std::mem::size_of::<ControlRegionData>();
9
10#[repr(C)]
11pub struct ControlRegionData {
12    pub client_head: AtomicU64,
13    pub daemon_tail: AtomicU64,
14    pub client_seq: AtomicU64,
15    pub daemon_seq: AtomicU64,
16    pub client_futex: AtomicU32,
17    pub daemon_futex: AtomicU32,
18    pub daemon_alive: AtomicBool,
19    pub client_count: AtomicU32,
20    pub backpressure: AtomicU32,
21    pub version: AtomicU16,
22    pub flags: AtomicU16,
23}
24
25#[repr(C, align(4096))]
26pub struct ControlRegion {
27    pub data: ControlRegionData,
28    padding: [u8; CONTROL_REGION_SIZE - DATA_SIZE],
29}
30
31const _: () = assert!(
32    std::mem::size_of::<ControlRegion>() == CONTROL_REGION_SIZE,
33    "ControlRegion must be exactly 4096 bytes"
34);
35
36const _: () = assert!(
37    std::mem::align_of::<ControlRegion>() >= 4096,
38    "ControlRegion must be page-aligned (4096 bytes)"
39);
40
41impl ControlRegion {
42    /// Initialize a new control region with default values
43    ///
44    /// # Safety
45    ///
46    /// This function uses `std::ptr::write` to initialize the structure.
47    /// The caller must ensure:
48    /// - `this` points to properly allocated and aligned memory
49    /// - The memory is writable and not accessed by other threads during initialization
50    /// - This is called only once before any other access
51    pub unsafe fn init(this: *mut Self) {
52        std::ptr::write(
53            this,
54            Self {
55                data: ControlRegionData {
56                    client_head: AtomicU64::new(0),
57                    daemon_tail: AtomicU64::new(0),
58                    client_seq: AtomicU64::new(0),
59                    daemon_seq: AtomicU64::new(0),
60                    client_futex: AtomicU32::new(0),
61                    daemon_futex: AtomicU32::new(0),
62                    daemon_alive: AtomicBool::new(false),
63                    client_count: AtomicU32::new(0),
64                    backpressure: AtomicU32::new(0),
65                    version: AtomicU16::new(1),
66                    flags: AtomicU16::new(0),
67                },
68                padding: [0u8; CONTROL_REGION_SIZE - DATA_SIZE],
69            },
70        );
71    }
72
73    pub fn backpressure(&self) -> f32 {
74        self.data.backpressure.load(Ordering::Acquire) as f32 / 1000.0
75    }
76
77    pub fn set_backpressure(&self, value: f32) {
78        let clamped = value.clamp(0.0, 1.0);
79        let scaled = (clamped * 1000.0) as u32;
80        self.data.backpressure.store(scaled, Ordering::Release);
81    }
82
83    pub fn increment_client_count(&self) -> u32 {
84        self.data.client_count.fetch_add(1, Ordering::AcqRel) + 1
85    }
86
87    pub fn decrement_client_count(&self) -> u32 {
88        self.data.client_count
89            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
90                count.checked_sub(1)
91            })
92            .unwrap_or(0)
93    }
94
95    pub fn set_daemon_alive(&self, alive: bool) {
96        self.data.daemon_alive.store(alive, Ordering::Release);
97    }
98
99    pub fn is_daemon_alive(&self) -> bool {
100        self.data.daemon_alive.load(Ordering::Acquire)
101    }
102
103    pub fn client_count(&self) -> u32 {
104        self.data.client_count.load(Ordering::Acquire)
105    }
106
107    pub fn version(&self) -> u16 {
108        self.data.version.load(Ordering::Acquire)
109    }
110
111    pub fn client_head(&self) -> u64 {
112        self.data.client_head.load(Ordering::Acquire)
113    }
114
115    pub fn set_client_head(&self, pos: u64) {
116        self.data.client_head.store(pos, Ordering::Release);
117    }
118
119    pub fn daemon_tail(&self) -> u64 {
120        self.data.daemon_tail.load(Ordering::Acquire)
121    }
122
123    pub fn set_daemon_tail(&self, pos: u64) {
124        self.data.daemon_tail.store(pos, Ordering::Release);
125    }
126
127    pub fn client_seq(&self) -> u64 {
128        self.data.client_seq.load(Ordering::Acquire)
129    }
130
131    pub fn increment_client_seq(&self) -> u64 {
132        self.data.client_seq.fetch_add(1, Ordering::AcqRel) + 1
133    }
134
135    pub fn daemon_seq(&self) -> u64 {
136        self.data.daemon_seq.load(Ordering::Acquire)
137    }
138
139    pub fn increment_daemon_seq(&self) -> u64 {
140        self.data.daemon_seq.fetch_add(1, Ordering::AcqRel) + 1
141    }
142
143    pub fn client_futex(&self) -> u32 {
144        self.data.client_futex.load(Ordering::Acquire)
145    }
146
147    pub fn set_client_futex(&self, val: u32) {
148        self.data.client_futex.store(val, Ordering::Release);
149    }
150
151    pub fn daemon_futex(&self) -> u32 {
152        self.data.daemon_futex.load(Ordering::Acquire)
153    }
154
155    pub fn set_daemon_futex(&self, val: u32) {
156        self.data.daemon_futex.store(val, Ordering::Release);
157    }
158}