1use super::*;
4
5impl ElephantMemoryStoreAdapter {
6 pub(super) fn from_config(
7 config: &ElephantMemoryBackendConfig,
8 ) -> Result<Self, ElephantMemoryStoreError> {
9 let endpoint = config.endpoint.trim();
10 if endpoint.is_empty() {
11 return Err(ElephantMemoryStoreError::InvalidConfig(
12 "memory backend endpoint must not be empty".to_string(),
13 ));
14 }
15 let state_path = config.state_path.trim();
16 if state_path.is_empty() {
17 return Err(ElephantMemoryStoreError::InvalidConfig(
18 "memory backend state_path must not be empty".to_string(),
19 ));
20 }
21 Ok(Self {
22 endpoint: endpoint.to_string(),
23 state_path: PathBuf::from(state_path),
24 })
25 }
26
27 fn ensure_remote_health(&self) -> Result<(), ElephantMemoryStoreError> {
28 let health_url = format!("{}/v1/health", self.endpoint.trim_end_matches('/'));
29 let parsed = parse_http_url(&health_url)?;
30 let authority = format!("{}:{}", parsed.host, parsed.port);
31 let mut addrs = authority.to_socket_addrs().map_err(|err| {
32 ElephantMemoryStoreError::ExternalCallFailed(format!(
33 "healthcheck resolve failed for '{health_url}': {err}"
34 ))
35 })?;
36 let addr = addrs.next().ok_or_else(|| {
37 ElephantMemoryStoreError::ExternalCallFailed(format!(
38 "healthcheck resolve failed for '{health_url}': no socket addresses"
39 ))
40 })?;
41 let mut stream =
42 TcpStream::connect_timeout(&addr, ELEPHANT_HEALTHCHECK_TIMEOUT).map_err(|err| {
43 ElephantMemoryStoreError::ExternalCallFailed(format!(
44 "healthcheck connect failed for '{health_url}': {err}"
45 ))
46 })?;
47 stream
48 .set_read_timeout(Some(ELEPHANT_HEALTHCHECK_TIMEOUT))
49 .map_err(|err| {
50 ElephantMemoryStoreError::ExternalCallFailed(format!(
51 "healthcheck timeout setup failed for '{health_url}': {err}"
52 ))
53 })?;
54 stream
55 .set_write_timeout(Some(ELEPHANT_HEALTHCHECK_TIMEOUT))
56 .map_err(|err| {
57 ElephantMemoryStoreError::ExternalCallFailed(format!(
58 "healthcheck timeout setup failed for '{health_url}': {err}"
59 ))
60 })?;
61 let request = format!(
62 "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
63 parsed.path, parsed.host
64 );
65 stream.write_all(request.as_bytes()).map_err(|err| {
66 ElephantMemoryStoreError::ExternalCallFailed(format!(
67 "healthcheck write failed for '{health_url}': {err}"
68 ))
69 })?;
70 let mut reader = BufReader::new(stream);
71 let mut status_line = String::new();
72 let bytes_read = reader.read_line(&mut status_line).map_err(|err| {
73 ElephantMemoryStoreError::ExternalCallFailed(format!(
74 "healthcheck read failed for '{health_url}': {err}"
75 ))
76 })?;
77 if bytes_read == 0 {
78 return Err(ElephantMemoryStoreError::ExternalCallFailed(format!(
79 "healthcheck read failed for '{health_url}': empty response"
80 )));
81 }
82 let status_code = status_line
83 .split_whitespace()
84 .nth(1)
85 .and_then(|value| value.parse::<u16>().ok())
86 .ok_or_else(|| {
87 ElephantMemoryStoreError::ExternalCallFailed(format!(
88 "healthcheck parse failed for '{health_url}': invalid status line '{}'",
89 status_line.trim()
90 ))
91 })?;
92 if (200..300).contains(&status_code) {
93 Ok(())
94 } else {
95 Err(ElephantMemoryStoreError::ExternalCallFailed(format!(
96 "healthcheck status failed for '{health_url}': HTTP {status_code}"
97 )))
98 }
99 }
100
101 pub(super) fn read_state(&self) -> Result<PersistedMemoryState, ElephantMemoryStoreError> {
102 self.ensure_remote_health()?;
103 if !self.state_path.exists() {
104 return Ok(PersistedMemoryState::default());
105 }
106 let bytes = fs::read(&self.state_path)
107 .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
108 serde_json::from_slice::<PersistedMemoryState>(&bytes)
109 .map_err(|err| ElephantMemoryStoreError::InvalidStoreData(err.to_string()))
110 }
111
112 fn write_state(&self, state: &PersistedMemoryState) -> Result<(), ElephantMemoryStoreError> {
113 self.ensure_remote_health()?;
114 if let Some(parent) = self.state_path.parent() {
115 fs::create_dir_all(parent)
116 .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
117 }
118 let tmp_path = self.state_path.with_extension("tmp");
119 let json = serde_json::to_vec_pretty(state)
120 .map_err(|err| ElephantMemoryStoreError::Serialize(err.to_string()))?;
121 fs::write(&tmp_path, json).map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
122 fs::rename(&tmp_path, &self.state_path)
123 .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
124 Ok(())
125 }
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129struct ParsedHttpUrl {
130 host: String,
131 port: u16,
132 path: String,
133}
134
135fn parse_http_url(url: &str) -> Result<ParsedHttpUrl, ElephantMemoryStoreError> {
136 let trimmed = url.trim();
137 let without_scheme = trimmed.strip_prefix("http://").ok_or_else(|| {
138 ElephantMemoryStoreError::InvalidConfig(format!(
139 "memory backend endpoint must start with http:// (got '{trimmed}')"
140 ))
141 })?;
142 if without_scheme.is_empty() {
143 return Err(ElephantMemoryStoreError::InvalidConfig(
144 "memory backend endpoint host must not be empty".to_string(),
145 ));
146 }
147 let (authority, path_suffix) = without_scheme
148 .split_once('/')
149 .map(|(left, right)| (left, format!("/{right}")))
150 .unwrap_or((without_scheme, "/".to_string()));
151 if authority.is_empty() {
152 return Err(ElephantMemoryStoreError::InvalidConfig(
153 "memory backend endpoint host must not be empty".to_string(),
154 ));
155 }
156 let (host, port) = match authority.rsplit_once(':') {
157 Some((host, raw_port))
158 if !host.is_empty() && raw_port.chars().all(|c| c.is_ascii_digit()) =>
159 {
160 let parsed = raw_port.parse::<u16>().map_err(|_| {
161 ElephantMemoryStoreError::InvalidConfig(format!(
162 "memory backend endpoint port is invalid in '{trimmed}'"
163 ))
164 })?;
165 (host.to_string(), parsed)
166 }
167 _ => (authority.to_string(), 80_u16),
168 };
169 if host.is_empty() {
170 return Err(ElephantMemoryStoreError::InvalidConfig(
171 "memory backend endpoint host must not be empty".to_string(),
172 ));
173 }
174 Ok(ParsedHttpUrl {
175 host,
176 port,
177 path: path_suffix,
178 })
179}
180
181impl MobkitRuntimeHandle {
182 fn next_memory_sequence(&mut self) -> u64 {
183 Self::next_sequence(&mut self.memory_sequence)
184 }
185
186 pub(super) fn canonical_memory_token(raw: &str) -> Option<String> {
187 let token = raw.trim().to_ascii_lowercase();
188 if token.is_empty() { None } else { Some(token) }
189 }
190
191 pub(super) fn canonical_memory_store(raw: &str) -> Option<String> {
192 let store = Self::canonical_memory_token(raw)?;
193 if MEMORY_SUPPORTED_STORES.contains(&store.as_str()) {
194 Some(store)
195 } else {
196 None
197 }
198 }
199
200 fn default_memory_store() -> String {
201 "knowledge_graph".to_string()
202 }
203
204 pub(super) fn memory_conflict_for_reference(
205 &self,
206 entity: Option<&str>,
207 topic: Option<&str>,
208 ) -> Option<MemoryConflictSignal> {
209 let canonical_entity = entity.and_then(Self::canonical_memory_token);
210 let canonical_topic = topic.and_then(Self::canonical_memory_token);
211 match (canonical_entity, canonical_topic) {
212 (Some(entity), Some(topic)) => self
213 .memory_conflicts
214 .values()
215 .find(|signal| signal.entity == entity && signal.topic == topic)
216 .cloned(),
217 (Some(entity), None) => self
218 .memory_conflicts
219 .values()
220 .find(|signal| signal.entity == entity)
221 .cloned(),
222 (None, Some(topic)) => self
223 .memory_conflicts
224 .values()
225 .find(|signal| signal.topic == topic)
226 .cloned(),
227 (None, None) => None,
228 }
229 }
230 pub fn memory_stores(&self) -> Vec<MemoryStoreInfo> {
231 MEMORY_SUPPORTED_STORES
232 .iter()
233 .map(|store| MemoryStoreInfo {
234 store: (*store).to_string(),
235 record_count: self
236 .memory_assertions
237 .iter()
238 .filter(|assertion| assertion.store == *store)
239 .count()
240 + self
241 .memory_conflicts
242 .values()
243 .filter(|signal| signal.store == *store)
244 .count(),
245 })
246 .collect()
247 }
248
249 fn persist_memory_state(&self) -> Result<(), MemoryIndexError> {
250 let Some(backend) = self.memory_backend.as_ref() else {
251 return Ok(());
252 };
253 let state = PersistedMemoryState {
254 assertions: self.memory_assertions.clone(),
255 conflicts: self.memory_conflicts.values().cloned().collect::<Vec<_>>(),
256 };
257 backend
258 .write_state(&state)
259 .map_err(MemoryIndexError::BackendPersistFailed)
260 }
261
262 pub fn memory_index(
263 &mut self,
264 request: MemoryIndexRequest,
265 ) -> Result<MemoryIndexResult, MemoryIndexError> {
266 let entity = Self::canonical_memory_token(&request.entity)
267 .ok_or(MemoryIndexError::EntityRequired)?;
268 let topic =
269 Self::canonical_memory_token(&request.topic).ok_or(MemoryIndexError::TopicRequired)?;
270 let store = match request.store.as_deref() {
271 None => Self::default_memory_store(),
272 Some(raw_store) => Self::canonical_memory_store(raw_store)
273 .ok_or_else(|| MemoryIndexError::UnsupportedStore(raw_store.trim().to_string()))?,
274 };
275 let fact = request
276 .fact
277 .as_deref()
278 .map(str::trim)
279 .filter(|value| !value.is_empty())
280 .map(ToString::to_string);
281 let conflict = request.conflict.unwrap_or(false);
282 if fact.is_none() && !conflict {
283 return Err(MemoryIndexError::FactRequiredWhenConflictUnset);
284 }
285
286 let previous_memory_assertions = self.memory_assertions.clone();
287 let previous_memory_conflicts = self.memory_conflicts.clone();
288 let previous_memory_sequence = self.memory_sequence;
289
290 let mut assertion_id = None;
291 if let Some(fact) = fact {
292 let assertion_sequence = self.next_memory_sequence();
293 let assertion = MemoryAssertion {
294 assertion_id: format!("memory-assert-{assertion_sequence:06}"),
295 entity: entity.clone(),
296 topic: topic.clone(),
297 store: store.clone(),
298 fact,
299 metadata: request.metadata.clone(),
300 indexed_at_ms: current_time_ms(),
301 };
302 assertion_id = Some(assertion.assertion_id.clone());
303 self.memory_assertions.push(assertion);
304 while self.memory_assertions.len() > MEMORY_ASSERTIONS_MAX_RETAINED {
305 self.memory_assertions.remove(0);
306 }
307 }
308
309 if conflict {
310 let conflict_key = MemoryConflictKey {
311 entity: entity.clone(),
312 topic: topic.clone(),
313 store: store.clone(),
314 };
315 self.memory_conflicts.insert(
316 conflict_key,
317 MemoryConflictSignal {
318 entity: entity.clone(),
319 topic: topic.clone(),
320 store: store.clone(),
321 reason: request
322 .conflict_reason
323 .as_deref()
324 .map(str::trim)
325 .filter(|value| !value.is_empty())
326 .map(ToString::to_string),
327 updated_at_ms: current_time_ms(),
328 },
329 );
330 }
331 if let Err(error) = self.persist_memory_state() {
332 self.memory_assertions = previous_memory_assertions;
333 self.memory_conflicts = previous_memory_conflicts;
334 self.memory_sequence = previous_memory_sequence;
335 return Err(error);
336 }
337
338 let conflict_active = self
339 .memory_conflict_for_reference(Some(entity.as_str()), Some(topic.as_str()))
340 .is_some();
341
342 Ok(MemoryIndexResult {
343 entity,
344 topic,
345 store,
346 assertion_id,
347 conflict_active,
348 })
349 }
350 pub fn memory_query(&self, request: MemoryQueryRequest) -> MemoryQueryResult {
351 let entity = request
352 .entity
353 .as_deref()
354 .and_then(Self::canonical_memory_token);
355 let topic = request
356 .topic
357 .as_deref()
358 .and_then(Self::canonical_memory_token);
359 let store = request
360 .store
361 .as_deref()
362 .and_then(Self::canonical_memory_store);
363 let assertions = self
364 .memory_assertions
365 .iter()
366 .filter(|assertion| {
367 entity
368 .as_ref()
369 .is_none_or(|value| assertion.entity.as_str() == value.as_str())
370 })
371 .filter(|assertion| {
372 topic
373 .as_ref()
374 .is_none_or(|value| assertion.topic.as_str() == value.as_str())
375 })
376 .filter(|assertion| {
377 store
378 .as_ref()
379 .is_none_or(|value| assertion.store.as_str() == value.as_str())
380 })
381 .cloned()
382 .collect::<Vec<_>>();
383 let conflicts = self
384 .memory_conflicts
385 .values()
386 .filter(|signal| {
387 entity
388 .as_ref()
389 .is_none_or(|value| signal.entity.as_str() == value.as_str())
390 })
391 .filter(|signal| {
392 topic
393 .as_ref()
394 .is_none_or(|value| signal.topic.as_str() == value.as_str())
395 })
396 .filter(|signal| {
397 store
398 .as_ref()
399 .is_none_or(|value| signal.store.as_str() == value.as_str())
400 })
401 .cloned()
402 .collect::<Vec<_>>();
403 MemoryQueryResult {
404 assertions,
405 conflicts,
406 }
407 }
408}