1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
//! `MemoriesFold` — decodes `EventMeta` + payload, routes on dispatch,
//! mutates [`super::state::MemoriesState`].
use super::super::super::redex::{RedexError, RedexEvent, RedexFold};
use super::super::meta::{
compute_checksum, compute_checksum_with_meta, EventMeta, EVENT_META_SIZE,
};
use super::dispatch::{
DISPATCH_MEMORY_DELETED, DISPATCH_MEMORY_PINNED, DISPATCH_MEMORY_RETAGGED,
DISPATCH_MEMORY_STORED, DISPATCH_MEMORY_UNPINNED,
};
use super::state::MemoriesState;
use super::types::{
Memory, MemoryDeletedPayload, MemoryPinTogglePayload, MemoryRetaggedPayload,
MemoryStoredPayload,
};
/// Fold implementation for the memories model.
pub struct MemoriesFold;
impl RedexFold<MemoriesState> for MemoriesFold {
fn apply(&mut self, ev: &RedexEvent, state: &mut MemoriesState) -> Result<(), RedexError> {
// Per-event decode failures use `RedexError::Decode` (a
// recoverable variant) so the `Stop` fold policy
// skip-and-continues instead of permanently halting on a
// single bad event. See `tasks/fold.rs` for the full
// rationale.
if ev.payload.len() < EVENT_META_SIZE {
return Err(RedexError::Decode(format!(
"memories payload too short: {} bytes (need >= {})",
ev.payload.len(),
EVENT_META_SIZE
)));
}
let meta = EventMeta::from_bytes(&ev.payload[..EVENT_META_SIZE])
.ok_or_else(|| RedexError::Decode("bad EventMeta prefix".into()))?;
let tail = &ev.payload[EVENT_META_SIZE..];
// Verify the corruption-detection checksum stamped at
// ingest against the bytes we received from RedEX.
//
// Try the v2 (header + tail) checksum first, fall back
// to v1 (tail-only) for records written by pre-v2
// adapters. New writes pass v2 and detect bit-flips in
// the header (e.g. a `STORED → DELETED` dispatch flip);
// legacy records pass v1 with the original undercoverage
// gap documented as a known limitation. See
// `compute_checksum`'s
// doc for the full scope and migration story.
let v2_expected = compute_checksum_with_meta(&meta, tail);
let valid = if meta.checksum == v2_expected {
true
} else {
// Fallback for legacy records.
meta.checksum == compute_checksum(tail)
};
if !valid {
return Err(RedexError::Decode(format!(
"memories fold: EventMeta checksum mismatch at seq {} (got {:#010x}, v2 expected {:#010x})",
ev.entry.seq, meta.checksum, v2_expected
)));
}
match meta.dispatch {
DISPATCH_MEMORY_STORED => {
let p: MemoryStoredPayload =
postcard::from_bytes(tail).map_err(|e| RedexError::Decode(e.to_string()))?;
// Treat STORED as a content-update for an
// existing id: preserve `pinned` and `created_ns`,
// advance `updated_ns`, and overwrite the rest. A
// blanket `insert` would silently replace any
// existing entry, so `memories.store(42, "updated",
// ...)` after `memories.pin(42)` would drop the pin
// flag and overwrite the original creation
// timestamp with no observable signal to the
// operator.
// Per perf #96: values are `Arc<Memory>`. Mutate
// through `Arc::make_mut` so the unique case
// (no outstanding readers) mutates in place and
// the shared case clones-on-write to preserve the
// reader's snapshot.
if let Some(existing) = state.memories.get_mut(&p.id) {
let m = std::sync::Arc::make_mut(existing);
m.content = p.content;
m.tags = p.tags;
m.source = p.source;
m.updated_ns = p.now_ns;
// pinned + created_ns intentionally preserved.
} else {
state.memories.insert(
p.id,
std::sync::Arc::new(Memory {
id: p.id,
content: p.content,
tags: p.tags,
source: p.source,
created_ns: p.now_ns,
updated_ns: p.now_ns,
pinned: false,
}),
);
}
}
DISPATCH_MEMORY_RETAGGED => {
let p: MemoryRetaggedPayload =
postcard::from_bytes(tail).map_err(|e| RedexError::Decode(e.to_string()))?;
if let Some(existing) = state.memories.get_mut(&p.id) {
let m = std::sync::Arc::make_mut(existing);
m.tags = p.tags;
m.updated_ns = p.now_ns;
}
}
DISPATCH_MEMORY_PINNED => {
let p: MemoryPinTogglePayload =
postcard::from_bytes(tail).map_err(|e| RedexError::Decode(e.to_string()))?;
if let Some(existing) = state.memories.get_mut(&p.id) {
let m = std::sync::Arc::make_mut(existing);
m.pinned = true;
m.updated_ns = p.now_ns;
}
}
DISPATCH_MEMORY_UNPINNED => {
let p: MemoryPinTogglePayload =
postcard::from_bytes(tail).map_err(|e| RedexError::Decode(e.to_string()))?;
if let Some(existing) = state.memories.get_mut(&p.id) {
let m = std::sync::Arc::make_mut(existing);
m.pinned = false;
m.updated_ns = p.now_ns;
}
}
DISPATCH_MEMORY_DELETED => {
let p: MemoryDeletedPayload =
postcard::from_bytes(tail).map_err(|e| RedexError::Decode(e.to_string()))?;
state.memories.remove(&p.id);
}
other => {
tracing::debug!(
dispatch = other,
seq = ev.entry.seq,
"memories fold: ignoring unknown dispatch"
);
}
}
Ok(())
}
}