1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
//! Framework persistence operations.
//!
//! Extracted from framework.rs to satisfy the 500 LOC gate.
use tracing::warn;
use crate::error::Result;
use crate::framework::ChaoticSemanticFramework;
impl ChaoticSemanticFramework {
/// Persist all data to storage
#[tracing::instrument(err, skip(self))]
pub async fn persist(&self) -> Result<()> {
if let Some(ref persistence) = self.persistence {
let p_start = std::time::Instant::now();
// ADR-0068: Persist ANN index state
{
let sing = self.singularity.read().await;
let ns = self.namespace.read().await;
if let Some(ns_state) = sing.get_namespace(&ns) {
if let Ok(index_data) = ns_state.index.serialize() {
if !index_data.is_empty() {
persistence.save_index(&ns, "main", &index_data).await?;
}
}
}
}
persistence.checkpoint().await?;
self.metrics.observe_persist_latency_ms(
u64::try_from(p_start.elapsed().as_millis()).unwrap_or(u64::MAX),
"persist",
);
}
Ok(())
}
/// Verify persistence connectivity.
#[tracing::instrument(err, skip(self))]
pub async fn persistence_health_check(&self) -> Result<()> {
if let Some(ref persistence) = self.persistence {
persistence.health_check().await?;
}
Ok(())
}
/// Load and replace all in-memory state from persistence.
///
/// Clears existing state, loads persisted state. Use for fresh starts.
/// See also: [`load_merge`](Self::load_merge) for additive semantics.
#[allow(clippy::significant_drop_tightening)] // Lock held for concept injection and index rebuild
#[tracing::instrument(err, skip(self))]
pub async fn load_replace(&self) -> Result<()> {
let p_start = std::time::Instant::now();
if let Some(ref persistence) = self.persistence {
let ns = self.namespace.read().await;
let concepts = persistence.load_all_concepts(&ns).await?;
for concept in &concepts {
self.validate_concept(concept)?;
}
let mut all_associations: Vec<(String, String, f32)> = Vec::new();
for concept in &concepts {
let links = persistence.load_associations(&ns, &concept.id).await?;
for (to_id, strength) in links {
all_associations.push((concept.id.clone(), to_id, strength));
}
}
{
let mut sing = self.singularity.write().await;
sing.clear(&ns);
for concept in concepts {
sing.inject(&ns, concept)?;
}
for (from_id, to_id, strength) in all_associations {
if let Err(error) = sing.associate(&ns, &from_id, &to_id, strength) {
warn!(
from_id = %from_id,
to_id = %to_id,
strength,
error = %error,
"skipping invalid association during load_replace"
);
}
}
}
// ADR-0068: Load ANN index state
// Prefer deserialize over rebuild if data is fresh.
// Propagate errors instead of silently ignoring (fixes test regression).
if let Ok(Some(index_data)) = persistence.load_index(&ns, "main").await {
{
let mut sing = self.singularity.write().await;
let ns_state = sing.get_namespace_mut(&ns);
ns_state.index.deserialize(&index_data)?;
}
} else {
// Fallback: rebuild index from concepts
{
let mut sing = self.singularity.write().await;
let ns_state = sing.get_namespace_mut(&ns);
let concepts_map = ns_state.concepts.clone();
ns_state.index.rebuild(&concepts_map)?;
}
}
self.metrics.observe_persist_latency_ms(
u64::try_from(p_start.elapsed().as_millis()).unwrap_or(u64::MAX),
"load",
);
}
Ok(())
}
/// Load and merge persisted state into in-memory state.
///
/// Preserves existing state, adds persisted state on top.
/// See also: [`load_replace`](Self::load_replace) for replacement semantics.
#[allow(clippy::significant_drop_tightening)] // Lock held for concept injection and index rebuild
#[tracing::instrument(err, skip(self))]
pub async fn load_merge(&self) -> Result<()> {
if let Some(ref persistence) = self.persistence {
let ns = self.namespace.read().await;
let concepts = persistence.load_all_concepts(&ns).await?;
for concept in &concepts {
self.validate_concept(concept)?;
}
{
let mut sing = self.singularity.write().await;
for concept in &concepts {
if sing.get(&ns, &concept.id).is_some() {
warn!(
concept_id = %concept.id,
"skipping persisted concept during load_merge because id already exists in memory"
);
continue;
}
sing.inject(&ns, (*concept).clone())?;
}
}
let mut all_associations: Vec<(String, String, f32)> = Vec::new();
for concept in &concepts {
let links = persistence.load_associations(&ns, &concept.id).await?;
for (to_id, strength) in links {
all_associations.push((concept.id.clone(), to_id, strength));
}
}
{
let mut sing = self.singularity.write().await;
for (from_id, to_id, strength) in all_associations {
if let Err(error) = sing.associate(&ns, &from_id, &to_id, strength) {
warn!(
from_id = %from_id,
to_id = %to_id,
strength,
error = %error,
"skipping invalid association during load_merge"
);
}
}
}
// ADR-0068: Load ANN index state
// We just injected new concepts into the index via sing.inject(),
// so the index is already updated with merged concepts.
// Rebuilding ensures optimal structure if many concepts were merged.
// Propagate errors instead of silently ignoring.
if let Ok(Some(index_data)) = persistence.load_index(&ns, "main").await {
{
let mut sing = self.singularity.write().await;
let ns_state = sing.get_namespace_mut(&ns);
ns_state.index.deserialize(&index_data)?;
}
} else {
// Fallback: rebuild index from concepts
{
let mut sing = self.singularity.write().await;
let ns_state = sing.get_namespace_mut(&ns);
let concepts_map = ns_state.concepts.clone();
ns_state.index.rebuild(&concepts_map)?;
}
}
}
Ok(())
}
}