axon/pem/state.rs
1//! [`CognitiveState`] — snapshot-able representation of an agent's
2//! mid-conversation posture.
3//!
4//! Floats encoded as Q32.32 fixed-point
5//! -----------------------------------
6//!
7//! A naive f64 roundtrip through MessagePack / JSON / Postgres
8//! `double precision` preserves the *nominal* value but not the
9//! *bit pattern*. After three reconnects the density matrix drifts
10//! by tens of ulps — small, but enough to shift downstream
11//! sampling decisions. We sidestep the class of bug entirely by
12//! quantising floats on the way in and de-quantising on the way
13//! out: every float becomes a signed 64-bit integer with 32 bits
14//! of fractional precision (Q32.32). The worst-case representable
15//! error is `2^-32 ≈ 2.3e-10`, well below the noise floor of
16//! anything a belief state cares about, and the roundtrip is
17//! bit-identical by construction.
18//!
19//! Callers who need the full f64 dynamic range (rare in PEM —
20//! belief entries live in `[0, 1]`) upgrade the representation per-
21//! call by using [`MemoryEntry::metadata`] which carries arbitrary
22//! JSON.
23
24use chrono::{DateTime, SubsecRound, Utc};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28/// 2^32 — the Q32.32 scale factor. Public because adopters may need
29/// it when reading raw rows out of a backend that wasn't routed
30/// through this crate's codec.
31pub const Q32_32_SCALE: f64 = 4_294_967_296.0; // 1u64 << 32
32
33/// Fixed-point wrapper for a single float. Serialises as a plain
34/// signed integer to stay compact + exact in any transport.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
36#[serde(transparent)]
37pub struct FixedPoint(pub i64);
38
39impl FixedPoint {
40 /// Quantise an `f64` to Q32.32. Saturates at the representable
41 /// range instead of panicking — `f64::INFINITY` becomes
42 /// `i64::MAX`.
43 pub fn from_f64(v: f64) -> Self {
44 let scaled = v * Q32_32_SCALE;
45 let clamped = scaled.clamp(i64::MIN as f64, i64::MAX as f64);
46 FixedPoint(clamped as i64)
47 }
48
49 pub fn to_f64(self) -> f64 {
50 (self.0 as f64) / Q32_32_SCALE
51 }
52
53 /// Element-wise quantisation for a vector. Convenience wrapper
54 /// used by adopters feeding matrix rows into the density matrix.
55 pub fn vec_from_f64(v: &[f64]) -> Vec<FixedPoint> {
56 v.iter().copied().map(FixedPoint::from_f64).collect()
57 }
58
59 pub fn vec_to_f64(v: &[FixedPoint]) -> Vec<f64> {
60 v.iter().copied().map(FixedPoint::to_f64).collect()
61 }
62}
63
64impl From<f64> for FixedPoint {
65 fn from(v: f64) -> Self {
66 FixedPoint::from_f64(v)
67 }
68}
69
70impl From<FixedPoint> for f64 {
71 fn from(q: FixedPoint) -> Self {
72 q.to_f64()
73 }
74}
75
76/// A single short-term memory entry. Intentionally unopinionated
77/// about what adopters store — `payload` is arbitrary JSON, and
78/// `symbolic_refs` holds handles to external buffers (audio clip
79/// IDs, document checksums) without embedding the bytes.
80#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
81pub struct MemoryEntry {
82 pub key: String,
83 pub payload: Value,
84 #[serde(default, skip_serializing_if = "Vec::is_empty")]
85 pub symbolic_refs: Vec<String>,
86 #[serde(with = "chrono::serde::ts_milliseconds")]
87 pub stored_at: DateTime<Utc>,
88}
89
90/// The full snapshot the backend persists. Canonical shape —
91/// adopters serialise this via JSON (the simplest wire format
92/// consistent with the 10.g canonicaliser and the 11.c replay
93/// tokens); future revisions can swap to MessagePack without
94/// changing the structural guarantees.
95#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
96pub struct CognitiveState {
97 /// Stable identifier of the originating WebSocket session.
98 /// Clients presenting a reconnect must prove ownership via the
99 /// companion [`crate::pem::continuity_token::ContinuityToken`].
100 pub session_id: String,
101 /// Tenant slug so multi-tenant deployments route state to the
102 /// correct RLS-scoped backend. Also carried forward to the SAR
103 /// exporter in 10.l.
104 pub tenant_id: String,
105 /// Flow-execution identifier — matches `ReplayToken.flow_id`
106 /// from 11.c so an auditor can correlate snapshots with the
107 /// replay stream.
108 pub flow_id: String,
109 /// Optional subject / user identifier the state belongs to.
110 /// Consumed by the SAR exporter in 10.l; nullable when the flow
111 /// ran under a service account.
112 #[serde(default)]
113 pub subject_user_id: Option<String>,
114
115 /// The agent's probability amplitudes in row-major order. Q32.32
116 /// fixed-point means the same matrix round-trips identically
117 /// across N reconnects.
118 pub density_matrix: Vec<Vec<FixedPoint>>,
119 /// Free-form belief state the flow author chose to preserve.
120 /// Structured so the replay executor can re-seed an identical
121 /// posture after rehydration.
122 #[serde(default, skip_serializing_if = "Value::is_null")]
123 pub belief_state: Value,
124 /// Short-term memory. Capped in practice by the flow's
125 /// `@reconnect_window` TTL; buffers referenced here should be
126 /// symbolic, not bytes inline.
127 #[serde(default)]
128 pub short_term_memory: Vec<MemoryEntry>,
129
130 #[serde(with = "chrono::serde::ts_milliseconds")]
131 pub created_at: DateTime<Utc>,
132 #[serde(with = "chrono::serde::ts_milliseconds")]
133 pub last_updated_at: DateTime<Utc>,
134}
135
136impl CognitiveState {
137 /// Build a minimal state for a fresh session.
138 pub fn new(
139 session_id: impl Into<String>,
140 tenant_id: impl Into<String>,
141 flow_id: impl Into<String>,
142 ) -> Self {
143 // §Fase 12.c — JSON serialisation of `DateTime<Utc>` via
144 // chrono's default Serialize impl produces RFC 3339 with
145 // millisecond precision, so any sub-millisecond fraction in
146 // `Utc::now()` is silently dropped on the way to disk and the
147 // restored state compares unequal to the in-memory original.
148 // Truncating to ms here aligns the in-memory representation
149 // with the on-wire format, making persist→restore a strict
150 // identity — the same invariant Q32.32 gives us for floats.
151 let now = Utc::now().trunc_subsecs(3);
152 CognitiveState {
153 session_id: session_id.into(),
154 tenant_id: tenant_id.into(),
155 flow_id: flow_id.into(),
156 subject_user_id: None,
157 density_matrix: Vec::new(),
158 belief_state: Value::Null,
159 short_term_memory: Vec::new(),
160 created_at: now,
161 last_updated_at: now,
162 }
163 }
164
165 /// Snapshot this state to opaque bytes suitable for a backend
166 /// to store. JSON for 11.d — simple, debuggable, lets the
167 /// Postgres adapter query into `state.density_matrix[...]`
168 /// when auditors need it.
169 pub fn encode(&self) -> Vec<u8> {
170 serde_json::to_vec(self).expect("CognitiveState is always serialisable")
171 }
172
173 /// Reconstruct a state from bytes previously produced by
174 /// [`encode`]. Returns an error on any decode failure — adopters
175 /// surface the mismatch as a deliberate "cold start" rather
176 /// than silently corrupting the session.
177 pub fn decode(bytes: &[u8]) -> Result<Self, StateDecodeError> {
178 serde_json::from_slice(bytes)
179 .map_err(|e| StateDecodeError(e.to_string()))
180 }
181}
182
183#[derive(Debug)]
184pub struct StateDecodeError(pub String);
185
186impl std::fmt::Display for StateDecodeError {
187 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188 write!(f, "cognitive state decode failed: {}", self.0)
189 }
190}
191
192impl std::error::Error for StateDecodeError {}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use chrono::TimeZone;
198 use serde_json::json;
199
200 fn fixed_ts() -> DateTime<Utc> {
201 Utc.with_ymd_and_hms(2026, 4, 22, 12, 0, 0).unwrap()
202 }
203
204 // ── FixedPoint ───────────────────────────────────────────────
205
206 #[test]
207 fn fixed_point_roundtrip_preserves_bit_pattern_across_n_hops() {
208 let originals = [0.0_f64, 1e-5, 0.25, 0.5, 0.9999, 1.0];
209 for v in originals {
210 let q = FixedPoint::from_f64(v);
211 let back = q.to_f64();
212 // After N quantise/dequantise cycles, the value is
213 // stable at the first cycle.
214 let q2 = FixedPoint::from_f64(back);
215 let back2 = q2.to_f64();
216 assert_eq!(q, q2, "value {v} drifts between cycles");
217 assert_eq!(back, back2);
218 }
219 }
220
221 #[test]
222 fn fixed_point_saturates_on_infinity() {
223 let q_pos = FixedPoint::from_f64(f64::INFINITY);
224 let q_neg = FixedPoint::from_f64(f64::NEG_INFINITY);
225 assert_eq!(q_pos.0, i64::MAX);
226 assert_eq!(q_neg.0, i64::MIN);
227 }
228
229 #[test]
230 fn fixed_point_vec_helpers() {
231 let v = vec![0.1, 0.25, 0.5];
232 let q = FixedPoint::vec_from_f64(&v);
233 let back = FixedPoint::vec_to_f64(&q);
234 // Each element round-trips; the first cycle fixes the
235 // representation so the second is identity.
236 let q2 = FixedPoint::vec_from_f64(&back);
237 assert_eq!(q, q2);
238 }
239
240 #[test]
241 fn fixed_point_representable_precision_is_about_2e_minus_10() {
242 // Two f64 values that differ by less than 2e-10 round-trip
243 // to the same FixedPoint — this is the documented precision
244 // ceiling; the test asserts it hasn't silently widened.
245 let a = 0.5_f64;
246 let b = a + 1e-11;
247 assert_eq!(FixedPoint::from_f64(a), FixedPoint::from_f64(b));
248 }
249
250 // ── CognitiveState ────────────────────────────────────────────
251
252 #[test]
253 fn encode_decode_roundtrip() {
254 let mut s = CognitiveState::new("sess-1", "alpha", "flow-1");
255 s.created_at = fixed_ts();
256 s.last_updated_at = fixed_ts();
257 s.density_matrix = vec![FixedPoint::vec_from_f64(&[0.1, 0.9])];
258 s.belief_state = json!({"confidence": 0.73});
259 s.short_term_memory.push(MemoryEntry {
260 key: "last_user_msg".into(),
261 payload: json!({"text": "hi"}),
262 symbolic_refs: vec!["audio-buf-17".into()],
263 stored_at: fixed_ts(),
264 });
265
266 let bytes = s.encode();
267 let decoded = CognitiveState::decode(&bytes).expect("decode");
268 assert_eq!(decoded, s);
269 }
270
271 #[test]
272 fn density_matrix_roundtrips_bit_identical_across_multiple_cycles() {
273 let mut s = CognitiveState::new("sess", "alpha", "f");
274 let original = vec![vec![0.1, 0.5, 0.9], vec![0.2, 0.3, 0.8]];
275 s.density_matrix = original
276 .iter()
277 .map(|row| FixedPoint::vec_from_f64(row))
278 .collect();
279
280 // Three encode/decode cycles.
281 let mut current = s.clone();
282 for _ in 0..3 {
283 let bytes = current.encode();
284 current = CognitiveState::decode(&bytes).expect("decode");
285 }
286 assert_eq!(current.density_matrix, s.density_matrix);
287 }
288
289 #[test]
290 fn decode_rejects_garbage() {
291 let err = CognitiveState::decode(b"not json").unwrap_err();
292 assert!(err.0.contains("decode failed") || !err.0.is_empty());
293 }
294
295 #[test]
296 fn optional_fields_default_cleanly() {
297 let minimal = r#"{
298 "session_id": "sess",
299 "tenant_id": "alpha",
300 "flow_id": "f",
301 "density_matrix": [],
302 "created_at": 1700000000000,
303 "last_updated_at": 1700000000000
304 }"#;
305 let decoded = CognitiveState::decode(minimal.as_bytes()).unwrap();
306 assert_eq!(decoded.subject_user_id, None);
307 assert_eq!(decoded.belief_state, Value::Null);
308 assert!(decoded.short_term_memory.is_empty());
309 }
310}