Skip to main content

meerkat_mobkit/runtime/
memory.rs

1//! Memory subsystem — Elephant-backed store adapter and query execution.
2
3use super::*;
4
5impl ElephantMemoryStoreAdapter {
6    pub(super) fn from_config(
7        config: &ElephantMemoryBackendConfig,
8    ) -> Result<Self, ElephantMemoryStoreError> {
9        let endpoint = config.endpoint.trim();
10        if endpoint.is_empty() {
11            return Err(ElephantMemoryStoreError::InvalidConfig(
12                "memory backend endpoint must not be empty".to_string(),
13            ));
14        }
15        let state_path = config.state_path.trim();
16        if state_path.is_empty() {
17            return Err(ElephantMemoryStoreError::InvalidConfig(
18                "memory backend state_path must not be empty".to_string(),
19            ));
20        }
21        Ok(Self {
22            endpoint: endpoint.to_string(),
23            state_path: PathBuf::from(state_path),
24        })
25    }
26
27    fn ensure_remote_health(&self) -> Result<(), ElephantMemoryStoreError> {
28        let health_url = format!("{}/v1/health", self.endpoint.trim_end_matches('/'));
29        let parsed = parse_http_url(&health_url)?;
30        let authority = format!("{}:{}", parsed.host, parsed.port);
31        let mut addrs = authority.to_socket_addrs().map_err(|err| {
32            ElephantMemoryStoreError::ExternalCallFailed(format!(
33                "healthcheck resolve failed for '{health_url}': {err}"
34            ))
35        })?;
36        let addr = addrs.next().ok_or_else(|| {
37            ElephantMemoryStoreError::ExternalCallFailed(format!(
38                "healthcheck resolve failed for '{health_url}': no socket addresses"
39            ))
40        })?;
41        let mut stream =
42            TcpStream::connect_timeout(&addr, ELEPHANT_HEALTHCHECK_TIMEOUT).map_err(|err| {
43                ElephantMemoryStoreError::ExternalCallFailed(format!(
44                    "healthcheck connect failed for '{health_url}': {err}"
45                ))
46            })?;
47        stream
48            .set_read_timeout(Some(ELEPHANT_HEALTHCHECK_TIMEOUT))
49            .map_err(|err| {
50                ElephantMemoryStoreError::ExternalCallFailed(format!(
51                    "healthcheck timeout setup failed for '{health_url}': {err}"
52                ))
53            })?;
54        stream
55            .set_write_timeout(Some(ELEPHANT_HEALTHCHECK_TIMEOUT))
56            .map_err(|err| {
57                ElephantMemoryStoreError::ExternalCallFailed(format!(
58                    "healthcheck timeout setup failed for '{health_url}': {err}"
59                ))
60            })?;
61        let request = format!(
62            "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
63            parsed.path, parsed.host
64        );
65        stream.write_all(request.as_bytes()).map_err(|err| {
66            ElephantMemoryStoreError::ExternalCallFailed(format!(
67                "healthcheck write failed for '{health_url}': {err}"
68            ))
69        })?;
70        let mut reader = BufReader::new(stream);
71        let mut status_line = String::new();
72        let bytes_read = reader.read_line(&mut status_line).map_err(|err| {
73            ElephantMemoryStoreError::ExternalCallFailed(format!(
74                "healthcheck read failed for '{health_url}': {err}"
75            ))
76        })?;
77        if bytes_read == 0 {
78            return Err(ElephantMemoryStoreError::ExternalCallFailed(format!(
79                "healthcheck read failed for '{health_url}': empty response"
80            )));
81        }
82        let status_code = status_line
83            .split_whitespace()
84            .nth(1)
85            .and_then(|value| value.parse::<u16>().ok())
86            .ok_or_else(|| {
87                ElephantMemoryStoreError::ExternalCallFailed(format!(
88                    "healthcheck parse failed for '{health_url}': invalid status line '{}'",
89                    status_line.trim()
90                ))
91            })?;
92        if (200..300).contains(&status_code) {
93            Ok(())
94        } else {
95            Err(ElephantMemoryStoreError::ExternalCallFailed(format!(
96                "healthcheck status failed for '{health_url}': HTTP {status_code}"
97            )))
98        }
99    }
100
101    pub(super) fn read_state(&self) -> Result<PersistedMemoryState, ElephantMemoryStoreError> {
102        self.ensure_remote_health()?;
103        if !self.state_path.exists() {
104            return Ok(PersistedMemoryState::default());
105        }
106        let bytes = fs::read(&self.state_path)
107            .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
108        serde_json::from_slice::<PersistedMemoryState>(&bytes)
109            .map_err(|err| ElephantMemoryStoreError::InvalidStoreData(err.to_string()))
110    }
111
112    fn write_state(&self, state: &PersistedMemoryState) -> Result<(), ElephantMemoryStoreError> {
113        self.ensure_remote_health()?;
114        if let Some(parent) = self.state_path.parent() {
115            fs::create_dir_all(parent)
116                .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
117        }
118        let tmp_path = self.state_path.with_extension("tmp");
119        let json = serde_json::to_vec_pretty(state)
120            .map_err(|err| ElephantMemoryStoreError::Serialize(err.to_string()))?;
121        fs::write(&tmp_path, json).map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
122        fs::rename(&tmp_path, &self.state_path)
123            .map_err(|err| ElephantMemoryStoreError::Io(err.to_string()))?;
124        Ok(())
125    }
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129struct ParsedHttpUrl {
130    host: String,
131    port: u16,
132    path: String,
133}
134
135fn parse_http_url(url: &str) -> Result<ParsedHttpUrl, ElephantMemoryStoreError> {
136    let trimmed = url.trim();
137    let without_scheme = trimmed.strip_prefix("http://").ok_or_else(|| {
138        ElephantMemoryStoreError::InvalidConfig(format!(
139            "memory backend endpoint must start with http:// (got '{trimmed}')"
140        ))
141    })?;
142    if without_scheme.is_empty() {
143        return Err(ElephantMemoryStoreError::InvalidConfig(
144            "memory backend endpoint host must not be empty".to_string(),
145        ));
146    }
147    let (authority, path_suffix) = without_scheme
148        .split_once('/')
149        .map(|(left, right)| (left, format!("/{right}")))
150        .unwrap_or((without_scheme, "/".to_string()));
151    if authority.is_empty() {
152        return Err(ElephantMemoryStoreError::InvalidConfig(
153            "memory backend endpoint host must not be empty".to_string(),
154        ));
155    }
156    let (host, port) = match authority.rsplit_once(':') {
157        Some((host, raw_port))
158            if !host.is_empty() && raw_port.chars().all(|c| c.is_ascii_digit()) =>
159        {
160            let parsed = raw_port.parse::<u16>().map_err(|_| {
161                ElephantMemoryStoreError::InvalidConfig(format!(
162                    "memory backend endpoint port is invalid in '{trimmed}'"
163                ))
164            })?;
165            (host.to_string(), parsed)
166        }
167        _ => (authority.to_string(), 80_u16),
168    };
169    if host.is_empty() {
170        return Err(ElephantMemoryStoreError::InvalidConfig(
171            "memory backend endpoint host must not be empty".to_string(),
172        ));
173    }
174    Ok(ParsedHttpUrl {
175        host,
176        port,
177        path: path_suffix,
178    })
179}
180
181impl MobkitRuntimeHandle {
182    fn next_memory_sequence(&mut self) -> u64 {
183        Self::next_sequence(&mut self.memory_sequence)
184    }
185
186    pub(super) fn canonical_memory_token(raw: &str) -> Option<String> {
187        let token = raw.trim().to_ascii_lowercase();
188        if token.is_empty() { None } else { Some(token) }
189    }
190
191    pub(super) fn canonical_memory_store(raw: &str) -> Option<String> {
192        let store = Self::canonical_memory_token(raw)?;
193        if MEMORY_SUPPORTED_STORES.contains(&store.as_str()) {
194            Some(store)
195        } else {
196            None
197        }
198    }
199
200    fn default_memory_store() -> String {
201        "knowledge_graph".to_string()
202    }
203
204    pub(super) fn memory_conflict_for_reference(
205        &self,
206        entity: Option<&str>,
207        topic: Option<&str>,
208    ) -> Option<MemoryConflictSignal> {
209        let canonical_entity = entity.and_then(Self::canonical_memory_token);
210        let canonical_topic = topic.and_then(Self::canonical_memory_token);
211        match (canonical_entity, canonical_topic) {
212            (Some(entity), Some(topic)) => self
213                .memory_conflicts
214                .values()
215                .find(|signal| signal.entity == entity && signal.topic == topic)
216                .cloned(),
217            (Some(entity), None) => self
218                .memory_conflicts
219                .values()
220                .find(|signal| signal.entity == entity)
221                .cloned(),
222            (None, Some(topic)) => self
223                .memory_conflicts
224                .values()
225                .find(|signal| signal.topic == topic)
226                .cloned(),
227            (None, None) => None,
228        }
229    }
230    pub fn memory_stores(&self) -> Vec<MemoryStoreInfo> {
231        MEMORY_SUPPORTED_STORES
232            .iter()
233            .map(|store| MemoryStoreInfo {
234                store: (*store).to_string(),
235                record_count: self
236                    .memory_assertions
237                    .iter()
238                    .filter(|assertion| assertion.store == *store)
239                    .count()
240                    + self
241                        .memory_conflicts
242                        .values()
243                        .filter(|signal| signal.store == *store)
244                        .count(),
245            })
246            .collect()
247    }
248
249    fn persist_memory_state(&self) -> Result<(), MemoryIndexError> {
250        let Some(backend) = self.memory_backend.as_ref() else {
251            return Ok(());
252        };
253        let state = PersistedMemoryState {
254            assertions: self.memory_assertions.clone(),
255            conflicts: self.memory_conflicts.values().cloned().collect::<Vec<_>>(),
256        };
257        backend
258            .write_state(&state)
259            .map_err(MemoryIndexError::BackendPersistFailed)
260    }
261
262    pub fn memory_index(
263        &mut self,
264        request: MemoryIndexRequest,
265    ) -> Result<MemoryIndexResult, MemoryIndexError> {
266        let entity = Self::canonical_memory_token(&request.entity)
267            .ok_or(MemoryIndexError::EntityRequired)?;
268        let topic =
269            Self::canonical_memory_token(&request.topic).ok_or(MemoryIndexError::TopicRequired)?;
270        let store = match request.store.as_deref() {
271            None => Self::default_memory_store(),
272            Some(raw_store) => Self::canonical_memory_store(raw_store)
273                .ok_or_else(|| MemoryIndexError::UnsupportedStore(raw_store.trim().to_string()))?,
274        };
275        let fact = request
276            .fact
277            .as_deref()
278            .map(str::trim)
279            .filter(|value| !value.is_empty())
280            .map(ToString::to_string);
281        let conflict = request.conflict.unwrap_or(false);
282        if fact.is_none() && !conflict {
283            return Err(MemoryIndexError::FactRequiredWhenConflictUnset);
284        }
285
286        let previous_memory_assertions = self.memory_assertions.clone();
287        let previous_memory_conflicts = self.memory_conflicts.clone();
288        let previous_memory_sequence = self.memory_sequence;
289
290        let mut assertion_id = None;
291        if let Some(fact) = fact {
292            let assertion_sequence = self.next_memory_sequence();
293            let assertion = MemoryAssertion {
294                assertion_id: format!("memory-assert-{assertion_sequence:06}"),
295                entity: entity.clone(),
296                topic: topic.clone(),
297                store: store.clone(),
298                fact,
299                metadata: request.metadata.clone(),
300                indexed_at_ms: current_time_ms(),
301            };
302            assertion_id = Some(assertion.assertion_id.clone());
303            self.memory_assertions.push(assertion);
304            while self.memory_assertions.len() > MEMORY_ASSERTIONS_MAX_RETAINED {
305                self.memory_assertions.remove(0);
306            }
307        }
308
309        if conflict {
310            let conflict_key = MemoryConflictKey {
311                entity: entity.clone(),
312                topic: topic.clone(),
313                store: store.clone(),
314            };
315            self.memory_conflicts.insert(
316                conflict_key,
317                MemoryConflictSignal {
318                    entity: entity.clone(),
319                    topic: topic.clone(),
320                    store: store.clone(),
321                    reason: request
322                        .conflict_reason
323                        .as_deref()
324                        .map(str::trim)
325                        .filter(|value| !value.is_empty())
326                        .map(ToString::to_string),
327                    updated_at_ms: current_time_ms(),
328                },
329            );
330        }
331        if let Err(error) = self.persist_memory_state() {
332            self.memory_assertions = previous_memory_assertions;
333            self.memory_conflicts = previous_memory_conflicts;
334            self.memory_sequence = previous_memory_sequence;
335            return Err(error);
336        }
337
338        let conflict_active = self
339            .memory_conflict_for_reference(Some(entity.as_str()), Some(topic.as_str()))
340            .is_some();
341
342        Ok(MemoryIndexResult {
343            entity,
344            topic,
345            store,
346            assertion_id,
347            conflict_active,
348        })
349    }
350    pub fn memory_query(&self, request: MemoryQueryRequest) -> MemoryQueryResult {
351        let entity = request
352            .entity
353            .as_deref()
354            .and_then(Self::canonical_memory_token);
355        let topic = request
356            .topic
357            .as_deref()
358            .and_then(Self::canonical_memory_token);
359        let store = request
360            .store
361            .as_deref()
362            .and_then(Self::canonical_memory_store);
363        let assertions = self
364            .memory_assertions
365            .iter()
366            .filter(|assertion| {
367                entity
368                    .as_ref()
369                    .is_none_or(|value| assertion.entity.as_str() == value.as_str())
370            })
371            .filter(|assertion| {
372                topic
373                    .as_ref()
374                    .is_none_or(|value| assertion.topic.as_str() == value.as_str())
375            })
376            .filter(|assertion| {
377                store
378                    .as_ref()
379                    .is_none_or(|value| assertion.store.as_str() == value.as_str())
380            })
381            .cloned()
382            .collect::<Vec<_>>();
383        let conflicts = self
384            .memory_conflicts
385            .values()
386            .filter(|signal| {
387                entity
388                    .as_ref()
389                    .is_none_or(|value| signal.entity.as_str() == value.as_str())
390            })
391            .filter(|signal| {
392                topic
393                    .as_ref()
394                    .is_none_or(|value| signal.topic.as_str() == value.as_str())
395            })
396            .filter(|signal| {
397                store
398                    .as_ref()
399                    .is_none_or(|value| signal.store.as_str() == value.as_str())
400            })
401            .cloned()
402            .collect::<Vec<_>>();
403        MemoryQueryResult {
404            assertions,
405            conflicts,
406        }
407    }
408}