1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
//! `NetDb` — unified query façade over one or more CortEX models.
use serde::{Deserialize, Serialize};
use super::super::cortex::memories::MemoriesAdapter;
use super::super::cortex::tasks::TasksAdapter;
use super::super::redex::{Redex, RedexFileConfig};
use super::error::NetDbError;
/// Portable, postcard-serialisable bundle of per-model snapshots.
/// Returned by [`NetDb::snapshot`]; consumed by
/// [`NetDbBuilder::build_from_snapshot`].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetDbSnapshot {
/// `Some((bytes, last_seq))` if tasks were included in the DB
/// and have been snapshotted; `None` otherwise.
pub tasks: Option<(Vec<u8>, Option<u64>)>,
/// Same, for memories.
pub memories: Option<(Vec<u8>, Option<u64>)>,
}
impl NetDbSnapshot {
/// Serialize the whole bundle into a single postcard blob for
/// persistence.
pub fn encode(&self) -> Result<Vec<u8>, NetDbError> {
postcard::to_allocvec(self).map_err(|e| NetDbError::Snapshot(e.to_string()))
}
/// Deserialize from a blob produced by [`Self::encode`].
pub fn decode(bytes: &[u8]) -> Result<Self, NetDbError> {
postcard::from_bytes(bytes).map_err(|e| NetDbError::Snapshot(e.to_string()))
}
}
/// Unified NetDB handle.
///
/// Bundles one or more CortEX adapters (tasks, memories, …) behind a
/// single handle. Construct via [`NetDb::builder`].
pub struct NetDb {
redex: Redex,
tasks: Option<TasksAdapter>,
memories: Option<MemoriesAdapter>,
}
impl NetDb {
/// Start building a NetDB.
pub fn builder(redex: Redex) -> NetDbBuilder {
NetDbBuilder {
redex,
origin_hash: 0,
persistent: false,
want_tasks: false,
want_memories: false,
}
}
/// Access the tasks model. Panics if `with_tasks()` wasn't
/// called on the builder. Use [`Self::try_tasks`] for a checked
/// accessor.
#[expect(
clippy::expect_used,
reason = "documented contract — callers must opt into tasks via builder; try_tasks is the checked accessor"
)]
pub fn tasks(&self) -> &TasksAdapter {
self.tasks
.as_ref()
.expect("NetDb: tasks not enabled — call `with_tasks()` on the builder")
}
/// Checked tasks accessor. Returns `None` if tasks were not
/// included at build time.
pub fn try_tasks(&self) -> Option<&TasksAdapter> {
self.tasks.as_ref()
}
/// Access the memories model. Panics if `with_memories()` wasn't
/// called.
#[expect(
clippy::expect_used,
reason = "documented contract — callers must opt into memories via builder; try_memories is the checked accessor"
)]
pub fn memories(&self) -> &MemoriesAdapter {
self.memories
.as_ref()
.expect("NetDb: memories not enabled — call `with_memories()` on the builder")
}
/// Checked memories accessor.
pub fn try_memories(&self) -> Option<&MemoriesAdapter> {
self.memories.as_ref()
}
/// Borrow the underlying `Redex` manager. Useful for lifecycle
/// operations (close a specific channel, sweep retention, etc.).
pub fn redex(&self) -> &Redex {
&self.redex
}
/// Close every enabled adapter. The underlying `Redex` files
/// stay open on the manager — reopening via another NetDb
/// against the same `Redex` instance replays or snapshots them.
/// Idempotent.
///
/// Both closes are attempted regardless of failure and the
/// FIRST error is surfaced as the function's return; the
/// SECOND error is logged at `warn` so a double-failure is
/// observable in tracing without conflating the typed
/// error surface. Pre-fix the second error was dropped
/// silently — operators investigating a `close()` failure
/// from the typed return would never see the second adapter's
/// error. The dominant double-failure mode is "underlying
/// redex already closed," which produces the same error
/// from both adapters and is uninteresting to disambiguate
/// in the typed return; the warn-log makes it observable
/// without adding a new error variant.
pub fn close(&self) -> Result<(), NetDbError> {
let tasks_result = self.tasks.as_ref().map(|t| t.close()).unwrap_or(Ok(()));
let memories_result = self.memories.as_ref().map(|m| m.close()).unwrap_or(Ok(()));
// Surface the first error; if both errored, the tasks one
// wins by convention (matches the pre-fix shape where tasks
// ran first). Log the second so it's not invisible.
match (tasks_result, memories_result) {
(Ok(()), Ok(())) => Ok(()),
(Err(e), Ok(())) => Err(e.into()),
(Ok(()), Err(e)) => Err(e.into()),
(Err(tasks_err), Err(memories_err)) => {
tracing::warn!(
tasks_error = %tasks_err,
memories_error = %memories_err,
"netdb close: both adapters failed; surfacing tasks error \
and logging memories error",
);
Err(tasks_err.into())
}
}
}
/// Capture a snapshot of every enabled model. Each model is
/// snapshotted under its own state lock (consistent per-model);
/// there is no cross-model consistency guarantee because each
/// model is a separate RedEX file.
///
/// **Cross-model ordering caveat.** Tasks are snapshotted
/// first, then memories. An ingest that lands in both models
/// between the two calls is captured by the memories snapshot
/// but missed by the tasks snapshot — so the resulting
/// `NetDbSnapshot` can split a logical "write to both models"
/// across the two halves. A watcher snapshotting between
/// event deliveries needs to either (a) treat each model's
/// `last_seq` as the authoritative ordering and reconcile
/// idempotently on restore, or (b) drain both models'
/// `wait_for_seq(known_last)` before calling `snapshot()` to
/// pin a deliberate cut-off. The lock order (tasks → memories)
/// is fixed so any future writer that takes both locks must
/// match this order to avoid deadlock.
pub fn snapshot(&self) -> Result<NetDbSnapshot, NetDbError> {
let tasks = if let Some(t) = &self.tasks {
Some(t.snapshot()?)
} else {
None
};
let memories = if let Some(m) = &self.memories {
Some(m.snapshot()?)
} else {
None
};
Ok(NetDbSnapshot { tasks, memories })
}
}
impl std::fmt::Debug for NetDb {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NetDb")
.field("tasks", &self.tasks.is_some())
.field("memories", &self.memories.is_some())
.finish()
}
}
/// Builder for [`NetDb`].
pub struct NetDbBuilder {
redex: Redex,
origin_hash: u64,
persistent: bool,
want_tasks: bool,
want_memories: bool,
}
impl NetDbBuilder {
/// Set the producer origin_hash stamped on every `EventMeta` by
/// the bundled adapters.
pub fn origin(mut self, origin_hash: u64) -> Self {
self.origin_hash = origin_hash;
self
}
/// When `true`, every enabled model's underlying RedEX file uses
/// `persistent: true`. Requires the `Redex` to have been
/// constructed with `with_persistent_dir(...)`.
pub fn persistent(mut self, persistent: bool) -> Self {
self.persistent = persistent;
self
}
/// Include the tasks model.
pub fn with_tasks(mut self) -> Self {
self.want_tasks = true;
self
}
/// Include the memories model.
pub fn with_memories(mut self) -> Self {
self.want_memories = true;
self
}
/// Build the NetDb. Opens each enabled model against the
/// underlying `Redex`.
///
/// # Failure atomicity
///
/// If the second adapter open fails, the first adapter is closed
/// before the error propagates so no orphan fold task outlives
/// the failed build. The `Redex` is dropped with the builder on
/// the error path — callers who want to retry without losing
/// shared state should construct a new `Redex` (retry on the
/// same manager is not available since the builder consumes it
/// by value).
///
/// The atomicity guarantee itself is code-level: the
/// close-on-error block below is the authoritative source of
/// truth. Integration tests exercise the observable error path
/// but cannot directly observe the closed first-adapter after
/// the Redex has been dropped.
pub async fn build(self) -> Result<NetDb, NetDbError> {
// Refuse a no-models build. Pre-fix this returned a no-op
// NetDb whose `tasks()` / `memories()` accessors panicked
// on first call; surface the config error as a typed
// `?` so a misconfigured profile or test fixture doesn't
// turn into a process panic at the first read.
if !self.want_tasks && !self.want_memories {
return Err(NetDbError::NoModelsEnabled);
}
let cfg = self.redex_config();
let tasks = if self.want_tasks {
Some(TasksAdapter::open_with_config(&self.redex, self.origin_hash, cfg.clone()).await?)
} else {
None
};
let memories = if self.want_memories {
match MemoriesAdapter::open_with_config(&self.redex, self.origin_hash, cfg).await {
Ok(m) => Some(m),
Err(e) => {
if let Some(t) = &tasks {
let _ = t.close();
}
return Err(e.into());
}
}
} else {
None
};
Ok(NetDb {
redex: self.redex,
tasks,
memories,
})
}
/// Like [`Self::build`], but restore each enabled model from the
/// corresponding entry in `snapshot`. A model enabled via
/// `with_*()` whose snapshot entry is `None` in the bundle is
/// opened from scratch via the normal open path (equivalent to
/// [`Self::build`] for that model).
///
/// Same failure-atomicity guarantee as [`Self::build`] — a
/// second-adapter failure closes the first before the error
/// propagates. See `build`'s docs for the caveat that the
/// failing Redex is dropped with the builder.
pub async fn build_from_snapshot(self, snapshot: &NetDbSnapshot) -> Result<NetDb, NetDbError> {
if !self.want_tasks && !self.want_memories {
return Err(NetDbError::NoModelsEnabled);
}
let cfg = self.redex_config();
let tasks = match (self.want_tasks, &snapshot.tasks) {
(true, Some((bytes, last_seq))) => Some(
TasksAdapter::open_from_snapshot_with_config(
&self.redex,
self.origin_hash,
cfg.clone(),
bytes,
*last_seq,
)
.await?,
),
(true, None) => Some(
TasksAdapter::open_with_config(&self.redex, self.origin_hash, cfg.clone()).await?,
),
(false, _) => None,
};
let memories_result = match (self.want_memories, &snapshot.memories) {
(true, Some((bytes, last_seq))) => Some(
MemoriesAdapter::open_from_snapshot_with_config(
&self.redex,
self.origin_hash,
cfg,
bytes,
*last_seq,
)
.await,
),
(true, None) => {
Some(MemoriesAdapter::open_with_config(&self.redex, self.origin_hash, cfg).await)
}
(false, _) => None,
};
let memories = match memories_result {
Some(Ok(m)) => Some(m),
Some(Err(e)) => {
if let Some(t) = &tasks {
let _ = t.close();
}
return Err(e.into());
}
None => None,
};
Ok(NetDb {
redex: self.redex,
tasks,
memories,
})
}
fn redex_config(&self) -> RedexFileConfig {
if self.persistent {
RedexFileConfig::default().with_persistent(true)
} else {
RedexFileConfig::default()
}
}
}