1use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, AtomicU64, Ordering};
5
6pub const CONTROL_REGION_SIZE: usize = 4096;
7
8const DATA_SIZE: usize = std::mem::size_of::<ControlRegionData>();
9
10#[repr(C)]
11pub struct ControlRegionData {
12 pub client_head: AtomicU64,
13 pub daemon_tail: AtomicU64,
14 pub client_seq: AtomicU64,
15 pub daemon_seq: AtomicU64,
16 pub client_futex: AtomicU32,
17 pub daemon_futex: AtomicU32,
18 pub daemon_alive: AtomicBool,
19 pub client_count: AtomicU32,
20 pub backpressure: AtomicU32,
21 pub version: AtomicU16,
22 pub flags: AtomicU16,
23}
24
25#[repr(C, align(4096))]
26pub struct ControlRegion {
27 pub data: ControlRegionData,
28 padding: [u8; CONTROL_REGION_SIZE - DATA_SIZE],
29}
30
31const _: () = assert!(
32 std::mem::size_of::<ControlRegion>() == CONTROL_REGION_SIZE,
33 "ControlRegion must be exactly 4096 bytes"
34);
35
36const _: () = assert!(
37 std::mem::align_of::<ControlRegion>() >= 4096,
38 "ControlRegion must be page-aligned (4096 bytes)"
39);
40
41impl ControlRegion {
42 pub unsafe fn init(this: *mut Self) {
52 std::ptr::write(
53 this,
54 Self {
55 data: ControlRegionData {
56 client_head: AtomicU64::new(0),
57 daemon_tail: AtomicU64::new(0),
58 client_seq: AtomicU64::new(0),
59 daemon_seq: AtomicU64::new(0),
60 client_futex: AtomicU32::new(0),
61 daemon_futex: AtomicU32::new(0),
62 daemon_alive: AtomicBool::new(false),
63 client_count: AtomicU32::new(0),
64 backpressure: AtomicU32::new(0),
65 version: AtomicU16::new(1),
66 flags: AtomicU16::new(0),
67 },
68 padding: [0u8; CONTROL_REGION_SIZE - DATA_SIZE],
69 },
70 );
71 }
72
73 pub fn backpressure(&self) -> f32 {
74 self.data.backpressure.load(Ordering::Acquire) as f32 / 1000.0
75 }
76
77 pub fn set_backpressure(&self, value: f32) {
78 let clamped = value.clamp(0.0, 1.0);
79 let scaled = (clamped * 1000.0) as u32;
80 self.data.backpressure.store(scaled, Ordering::Release);
81 }
82
83 pub fn increment_client_count(&self) -> u32 {
84 self.data.client_count.fetch_add(1, Ordering::AcqRel) + 1
85 }
86
87 pub fn decrement_client_count(&self) -> u32 {
88 self.data.client_count
89 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| {
90 count.checked_sub(1)
91 })
92 .unwrap_or(0)
93 }
94
95 pub fn set_daemon_alive(&self, alive: bool) {
96 self.data.daemon_alive.store(alive, Ordering::Release);
97 }
98
99 pub fn is_daemon_alive(&self) -> bool {
100 self.data.daemon_alive.load(Ordering::Acquire)
101 }
102
103 pub fn client_count(&self) -> u32 {
104 self.data.client_count.load(Ordering::Acquire)
105 }
106
107 pub fn version(&self) -> u16 {
108 self.data.version.load(Ordering::Acquire)
109 }
110
111 pub fn client_head(&self) -> u64 {
112 self.data.client_head.load(Ordering::Acquire)
113 }
114
115 pub fn set_client_head(&self, pos: u64) {
116 self.data.client_head.store(pos, Ordering::Release);
117 }
118
119 pub fn daemon_tail(&self) -> u64 {
120 self.data.daemon_tail.load(Ordering::Acquire)
121 }
122
123 pub fn set_daemon_tail(&self, pos: u64) {
124 self.data.daemon_tail.store(pos, Ordering::Release);
125 }
126
127 pub fn client_seq(&self) -> u64 {
128 self.data.client_seq.load(Ordering::Acquire)
129 }
130
131 pub fn increment_client_seq(&self) -> u64 {
132 self.data.client_seq.fetch_add(1, Ordering::AcqRel) + 1
133 }
134
135 pub fn daemon_seq(&self) -> u64 {
136 self.data.daemon_seq.load(Ordering::Acquire)
137 }
138
139 pub fn increment_daemon_seq(&self) -> u64 {
140 self.data.daemon_seq.fetch_add(1, Ordering::AcqRel) + 1
141 }
142
143 pub fn client_futex(&self) -> u32 {
144 self.data.client_futex.load(Ordering::Acquire)
145 }
146
147 pub fn set_client_futex(&self, val: u32) {
148 self.data.client_futex.store(val, Ordering::Release);
149 }
150
151 pub fn daemon_futex(&self) -> u32 {
152 self.data.daemon_futex.load(Ordering::Acquire)
153 }
154
155 pub fn set_daemon_futex(&self, val: u32) {
156 self.data.daemon_futex.store(val, Ordering::Release);
157 }
158}