Skip to main content

thingd_core/
in_memory.rs

1//! In-memory storage adapter used for API design and tests.
2
3use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::ListEventsOptions;
6use crate::{
7    EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8    ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9    QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12/// In-memory engine used to prove the storage boundary.
13///
14/// # Examples
15///
16/// ```rust
17/// use thingd_core::{MemoryEngine, ObjectStore, EventLog, MemoryObject, MemoryEvent};
18///
19/// let mut engine = MemoryEngine::new();
20///
21/// engine.put_object(MemoryObject::new("users", "alice", r#"{"name":"Alice"}"#)).unwrap();
22/// engine.append_event(MemoryEvent::new("audit", "login", r#"{"user":"alice"}"#)).unwrap();
23///
24/// assert_eq!(engine.count_objects().unwrap(), 1);
25/// assert_eq!(engine.count_events().unwrap(), 1);
26/// ```
27#[derive(Default)]
28pub struct MemoryEngine {
29    objects: BTreeMap<ObjectKey, MemoryObject>,
30    events: Vec<MemoryEvent>,
31    queues: BTreeMap<String, VecDeque<QueueJob>>,
32    links: Vec<Link>,
33    next_event_sequence: u64,
34}
35
36impl MemoryEngine {
37    /// Create a new empty in-memory engine.
38    pub fn new() -> Self {
39        Self::default()
40    }
41}
42
43impl ObjectStore for MemoryEngine {
44    fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
45        let now = now_iso_string();
46        let version = self
47            .objects
48            .get(&object.key)
49            .map_or(1, |existing| existing.version + 1);
50
51        object.version = version;
52        object.updated_at.clone_from(&now);
53        if object.created_at.is_empty() {
54            object.created_at = now;
55        }
56        self.objects.insert(object.key.clone(), object.clone());
57
58        Ok(object)
59    }
60
61    fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
62        Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
63    }
64
65    fn list_objects(&self, collections: Option<&[String]>) -> ThingdResult<Vec<MemoryObject>> {
66        let objects = self
67            .objects
68            .values()
69            .filter(|object| {
70                collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
71            })
72            .cloned()
73            .collect();
74
75        Ok(objects)
76    }
77
78    fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
79        Ok(self
80            .objects
81            .remove(&ObjectKey::new(collection, id))
82            .is_some())
83    }
84
85    fn count_objects(&self) -> ThingdResult<u64> {
86        Ok(self.objects.len() as u64)
87    }
88
89    fn list_collections(&self) -> ThingdResult<Vec<String>> {
90        let mut collections: Vec<String> = self
91            .objects
92            .keys()
93            .map(|key| key.collection.clone())
94            .collect();
95        collections.sort();
96        collections.dedup();
97        Ok(collections)
98    }
99}
100
101impl EventLog for MemoryEngine {
102    fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
103        self.next_event_sequence += 1;
104        event.sequence = self.next_event_sequence;
105        if event.created_at.is_empty() {
106            event.created_at = now_iso_string();
107        }
108        self.events.push(event.clone());
109
110        Ok(event)
111    }
112
113    fn list_events(
114        &self,
115        stream: Option<&str>,
116        options: ListEventsOptions,
117    ) -> ThingdResult<Vec<MemoryEvent>> {
118        let events = self
119            .events
120            .iter()
121            .filter(|event| stream.is_none_or(|target| event.stream == target))
122            .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
123            .cloned()
124            .collect::<Vec<_>>();
125
126        Ok(match options.limit {
127            Some(limit) => events
128                .into_iter()
129                .take(usize::try_from(limit).unwrap_or(usize::MAX))
130                .collect(),
131            None => events,
132        })
133    }
134
135    fn count_events(&self) -> ThingdResult<u64> {
136        Ok(self.events.len() as u64)
137    }
138
139    fn list_streams(&self) -> ThingdResult<Vec<String>> {
140        let mut streams: Vec<String> = self
141            .events
142            .iter()
143            .map(|event| event.stream.clone())
144            .collect();
145        streams.sort();
146        streams.dedup();
147        Ok(streams)
148    }
149}
150
151impl QueueStore for MemoryEngine {
152    fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
153        let jobs = self.queues.entry(job.queue.clone()).or_default();
154
155        if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
156            return Ok(existing.clone());
157        }
158
159        jobs.push_back(job.clone());
160        Ok(job)
161    }
162
163    fn claim_job_with_options(
164        &mut self,
165        queue: &str,
166        options: QueueClaimOptions,
167    ) -> ThingdResult<Option<QueueJob>> {
168        self.release_expired_leases(queue);
169
170        let Some(jobs) = self.queues.get_mut(queue) else {
171            return Ok(None);
172        };
173
174        let now = unix_timestamp_millis();
175        let Some(job) = jobs.iter_mut().find(|candidate| {
176            candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
177        }) else {
178            return Ok(None);
179        };
180
181        job.status = QueueJobStatus::Leased;
182        job.attempts += 1;
183        job.leased_at_ms = Some(now);
184        job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
185
186        Ok(Some(job.clone()))
187    }
188
189    fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
190        let Some(job) = self.find_job_mut(queue, id) else {
191            return Ok(None);
192        };
193
194        if job.status != QueueJobStatus::Leased {
195            return Err(ThingdError::Conflict(format!(
196                "job {id} must be leased before ack"
197            )));
198        }
199
200        job.status = QueueJobStatus::Completed;
201        job.completed_at_ms = Some(unix_timestamp_millis());
202
203        Ok(Some(job.clone()))
204    }
205
206    fn nack_job_with_options(
207        &mut self,
208        queue: &str,
209        id: &str,
210        options: QueueNackOptions,
211    ) -> ThingdResult<Option<QueueJob>> {
212        let Some(job) = self.find_job_mut(queue, id) else {
213            return Ok(None);
214        };
215
216        if job.status != QueueJobStatus::Leased {
217            return Err(ThingdError::Conflict(format!(
218                "job {id} must be leased before nack"
219            )));
220        }
221
222        let now = unix_timestamp_millis();
223        job.leased_at_ms = None;
224        job.lease_expires_at_ms = None;
225        if !options.error.is_empty() {
226            job.last_error = options.error;
227        }
228        job.status = if job.attempts >= job.max_attempts {
229            job.dead_at_ms = Some(now);
230            QueueJobStatus::Dead
231        } else {
232            job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
233            QueueJobStatus::Ready
234        };
235
236        Ok(Some(job.clone()))
237    }
238
239    fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
240        Ok(self
241            .queues
242            .get(queue)
243            .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
244    }
245
246    fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
247        Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
248            jobs.iter()
249                .filter(|job| job.status == QueueJobStatus::Dead)
250                .cloned()
251                .collect()
252        }))
253    }
254
255    fn list_queues(&self) -> ThingdResult<Vec<String>> {
256        let mut queues: Vec<String> = self.queues.keys().cloned().collect();
257        queues.sort();
258        Ok(queues)
259    }
260
261    fn count_active_jobs(&self) -> ThingdResult<u64> {
262        let count = self
263            .queues
264            .values()
265            .flat_map(|jobs| jobs.iter())
266            .filter(|job| job.status != QueueJobStatus::Dead)
267            .count();
268        Ok(count as u64)
269    }
270
271    fn count_dead_jobs(&self) -> ThingdResult<u64> {
272        let count = self
273            .queues
274            .values()
275            .flat_map(|jobs| jobs.iter())
276            .filter(|job| job.status == QueueJobStatus::Dead)
277            .count();
278        Ok(count as u64)
279    }
280}
281
282impl MemoryEngine {
283    fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
284        self.queues
285            .get_mut(queue)?
286            .iter_mut()
287            .find(|job| job.id == id)
288    }
289
290    fn release_expired_leases(&mut self, queue: &str) {
291        let now = unix_timestamp_millis();
292
293        for job in self.queues.get_mut(queue).into_iter().flatten() {
294            if job.status == QueueJobStatus::Leased
295                && job
296                    .lease_expires_at_ms
297                    .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
298            {
299                job.status = QueueJobStatus::Ready;
300                job.leased_at_ms = None;
301                job.lease_expires_at_ms = None;
302            }
303        }
304    }
305}
306
307impl crate::store::Searcher for MemoryEngine {
308    fn search(
309        &self,
310        query: &str,
311        options: crate::SearchOptions,
312    ) -> ThingdResult<Vec<crate::SearchHit>> {
313        let query_words: Vec<String> = query
314            .split_whitespace()
315            .map(|w| {
316                w.to_lowercase()
317                    .chars()
318                    .filter(|c| c.is_alphanumeric())
319                    .collect()
320            })
321            .filter(|w: &String| !w.is_empty())
322            .collect();
323
324        if query_words.is_empty() {
325            return Ok(Vec::new());
326        }
327
328        let mut hits = Vec::new();
329
330        // 1. Search objects
331        for object in self.objects.values() {
332            // Apply collection filter
333            if let Some(ref collections) = options.collections
334                && !collections.contains(&object.key.collection)
335            {
336                continue;
337            }
338
339            // Apply metadata filter
340            if let Some(ref filter) = options.filter
341                && !matches_filter_memory(&object.body, filter)
342            {
343                continue;
344            }
345
346            let text_to_search = format!(
347                "{} {} {}",
348                object.key.collection, object.key.id, object.body
349            )
350            .to_lowercase();
351            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
352
353            if matches_all {
354                hits.push(crate::SearchHit {
355                    kind: "object".to_string(),
356                    collection: object.key.collection.clone(),
357                    id: object.key.id.clone(),
358                    text: object.body.clone(),
359                    score: 1.0,
360                    body: object.body.clone(),
361                    version: Some(object.version),
362                    created_at: object.created_at.clone(),
363                    updated_at: Some(object.updated_at.clone()),
364                    event_type: None,
365                });
366            }
367        }
368
369        // 2. Search events
370        for event in &self.events {
371            // Apply collection filter
372            if let Some(ref collections) = options.collections
373                && !collections.contains(&event.stream)
374            {
375                continue;
376            }
377
378            // Apply metadata filter
379            if let Some(ref filter) = options.filter
380                && !matches_filter_memory(&event.body, filter)
381            {
382                continue;
383            }
384
385            let text_to_search =
386                format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
387            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
388
389            if matches_all {
390                hits.push(crate::SearchHit {
391                    kind: "event".to_string(),
392                    collection: event.stream.clone(),
393                    id: event.sequence.to_string(),
394                    text: event.body.clone(),
395                    score: 1.0,
396                    body: event.body.clone(),
397                    version: None,
398                    created_at: event.created_at.clone(),
399                    updated_at: None,
400                    event_type: Some(event.event_type.clone()),
401                });
402            }
403        }
404
405        // Limit results if requested
406        if let Some(limit) = options.limit {
407            hits.truncate(limit);
408        }
409
410        Ok(hits)
411    }
412}
413
414impl LinkStore for MemoryEngine {
415    fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
416        let id = format!("link-{}", self.links.len() + 1);
417        link.id = id;
418        if link.created_at.is_empty() {
419            link.created_at = now_iso_string();
420        }
421        self.links.push(link.clone());
422        Ok(link)
423    }
424
425    fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
426        let len_before = self.links.len();
427        self.links.retain(|l| l.id != id);
428        Ok(self.links.len() < len_before)
429    }
430
431    fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
432        Ok(self.links.iter().find(|l| l.id == id).cloned())
433    }
434
435    fn get_neighbors(
436        &self,
437        reference: &str,
438        direction: LinkDirection,
439        options: LinkQueryOptions,
440    ) -> ThingdResult<Vec<Link>> {
441        let neighbors: Vec<Link> = self
442            .links
443            .iter()
444            .filter(|link| {
445                let matches_direction = match direction {
446                    LinkDirection::Outgoing => link.from_ref == reference,
447                    LinkDirection::Incoming => link.to_ref == reference,
448                    LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
449                };
450                let matches_type = options
451                    .link_type
452                    .as_deref()
453                    .is_none_or(|t| link.link_type == t);
454                matches_direction && matches_type
455            })
456            .cloned()
457            .collect();
458
459        Ok(match options.limit {
460            Some(limit) => neighbors.into_iter().take(limit).collect(),
461            None => neighbors,
462        })
463    }
464
465    fn count_links(&self) -> ThingdResult<u64> {
466        Ok(self.links.len() as u64)
467    }
468}
469
470fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
471    let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
472        return false;
473    };
474
475    let Some(filter_obj) = filter.as_object() else {
476        return true;
477    };
478
479    for (k, v) in filter_obj {
480        if body.get(k) != Some(v) {
481            return false;
482        }
483    }
484    true
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490    use crate::SearchOptions;
491    use crate::store::Searcher;
492
493    #[test]
494    fn stores_and_reads_objects() {
495        let mut engine = MemoryEngine::new();
496
497        let object = engine
498            .put_object(MemoryObject::new(
499                "decisions",
500                "rust-core",
501                "{\"text\":\"Use Rust\"}",
502            ))
503            .unwrap();
504
505        let stored = engine
506            .get_object("decisions", "rust-core")
507            .unwrap()
508            .unwrap();
509        assert_eq!(object.version, 1);
510        assert_eq!(stored.key.collection, "decisions");
511        assert_eq!(stored.key.id, "rust-core");
512    }
513
514    #[test]
515    fn lists_objects_with_optional_collection_filter() {
516        let mut engine = MemoryEngine::new();
517
518        engine
519            .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
520            .unwrap();
521        engine
522            .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
523            .unwrap();
524
525        let filtered = engine
526            .list_objects(Some(&["decisions".to_string()]))
527            .unwrap();
528
529        assert_eq!(engine.list_objects(None).unwrap().len(), 2);
530        assert_eq!(filtered.len(), 1);
531        assert_eq!(filtered[0].key.collection, "decisions");
532    }
533
534    #[test]
535    fn appends_events_with_sequence_numbers() {
536        let mut engine = MemoryEngine::new();
537
538        let event = engine
539            .append_event(MemoryEvent::new(
540                "project:thingd",
541                "decision.made",
542                "MCP-native object storage",
543            ))
544            .unwrap();
545
546        assert_eq!(event.sequence, 1);
547        assert_eq!(
548            engine
549                .list_events(Some("project:thingd"), ListEventsOptions::default())
550                .unwrap()
551                .len(),
552            1
553        );
554    }
555
556    #[test]
557    fn claims_and_acks_queue_jobs() {
558        let mut engine = MemoryEngine::new();
559
560        engine
561            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
562            .unwrap();
563
564        let claimed = engine.claim_job("embed").unwrap().unwrap();
565        let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
566
567        assert_eq!(claimed.status, QueueJobStatus::Leased);
568        assert_eq!(claimed.attempts, 1);
569        assert_eq!(acked.status, QueueJobStatus::Completed);
570    }
571
572    #[test]
573    fn nacks_jobs_to_dead_letter_after_max_attempts() {
574        let mut engine = MemoryEngine::new();
575
576        engine
577            .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
578            .unwrap();
579
580        engine.claim_job("embed").unwrap().unwrap();
581        let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
582
583        assert_eq!(nacked.status, QueueJobStatus::Dead);
584        assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
585    }
586
587    #[test]
588    fn does_not_claim_delayed_jobs_before_available() {
589        let mut engine = MemoryEngine::new();
590
591        engine
592            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
593            .unwrap();
594
595        assert!(engine.claim_job("embed").unwrap().is_none());
596    }
597
598    #[test]
599    fn reclaims_jobs_after_lease_expiration() {
600        let mut engine = MemoryEngine::new();
601
602        engine
603            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
604            .unwrap();
605
606        let first = engine
607            .claim_job_with_options("embed", QueueClaimOptions::new(0))
608            .unwrap()
609            .unwrap();
610        let second = engine.claim_job("embed").unwrap().unwrap();
611
612        assert_eq!(first.status, QueueJobStatus::Leased);
613        assert_eq!(second.status, QueueJobStatus::Leased);
614        assert_eq!(second.attempts, 2);
615    }
616
617    #[test]
618    fn nacks_jobs_with_retry_delay() {
619        let mut engine = MemoryEngine::new();
620
621        engine
622            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
623            .unwrap();
624
625        engine.claim_job("embed").unwrap().unwrap();
626        let retried = engine
627            .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
628            .unwrap()
629            .unwrap();
630
631        assert_eq!(retried.status, QueueJobStatus::Ready);
632        assert!(engine.claim_job("embed").unwrap().is_none());
633    }
634
635    #[test]
636    fn counts_objects_events_and_jobs() {
637        let mut engine = MemoryEngine::new();
638
639        assert_eq!(engine.count_objects().unwrap(), 0);
640        assert_eq!(engine.count_events().unwrap(), 0);
641        assert_eq!(engine.count_active_jobs().unwrap(), 0);
642        assert_eq!(engine.count_dead_jobs().unwrap(), 0);
643
644        engine
645            .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
646            .unwrap();
647        engine
648            .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
649            .unwrap();
650        engine
651            .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
652            .unwrap();
653        assert_eq!(engine.count_objects().unwrap(), 3);
654
655        engine
656            .append_event(MemoryEvent::new("s1", "t1", "e1"))
657            .unwrap();
658        engine
659            .append_event(MemoryEvent::new("s1", "t2", "e2"))
660            .unwrap();
661        engine
662            .append_event(MemoryEvent::new("s2", "t3", "e3"))
663            .unwrap();
664        assert_eq!(engine.count_events().unwrap(), 3);
665
666        engine
667            .push_job(QueueJob::new("work", "j1", "p1", 3))
668            .unwrap();
669        engine
670            .push_job(QueueJob::new("work", "j2", "p2", 3))
671            .unwrap();
672        engine
673            .push_job(QueueJob::new("other", "j3", "p3", 1))
674            .unwrap();
675        assert_eq!(engine.count_active_jobs().unwrap(), 3);
676
677        engine.claim_job("other").unwrap();
678        engine.nack_job("other", "j3").unwrap();
679        assert_eq!(engine.count_dead_jobs().unwrap(), 1);
680        assert_eq!(engine.count_active_jobs().unwrap(), 2);
681    }
682
683    #[test]
684    fn lists_collections_streams_and_queues() {
685        let mut engine = MemoryEngine::new();
686
687        assert!(engine.list_collections().unwrap().is_empty());
688        assert!(engine.list_streams().unwrap().is_empty());
689        assert!(engine.list_queues().unwrap().is_empty());
690
691        engine
692            .put_object(MemoryObject::new("col-a", "x", "{}"))
693            .unwrap();
694        engine
695            .put_object(MemoryObject::new("col-b", "y", "{}"))
696            .unwrap();
697        engine
698            .put_object(MemoryObject::new("col-a", "z", "{}"))
699            .unwrap();
700        let collections = engine.list_collections().unwrap();
701        assert_eq!(collections, vec!["col-a", "col-b"]);
702
703        engine
704            .append_event(MemoryEvent::new("s1", "t", "e1"))
705            .unwrap();
706        engine
707            .append_event(MemoryEvent::new("s2", "t", "e2"))
708            .unwrap();
709        let streams = engine.list_streams().unwrap();
710        assert_eq!(streams, vec!["s1", "s2"]);
711
712        engine
713            .push_job(QueueJob::new("work", "j1", "p1", 3))
714            .unwrap();
715        engine
716            .push_job(QueueJob::new("jobs", "j2", "p2", 3))
717            .unwrap();
718        let queues = engine.list_queues().unwrap();
719        assert_eq!(queues, vec!["jobs", "work"]);
720    }
721
722    #[test]
723    fn search_respects_filter_and_limit() {
724        let mut engine = MemoryEngine::new();
725
726        engine
727            .put_object(MemoryObject::new(
728                "docs",
729                "a",
730                r#"{"text":"hello world","tag":"greeting"}"#,
731            ))
732            .unwrap();
733        engine
734            .put_object(MemoryObject::new(
735                "docs",
736                "b",
737                r#"{"text":"hello there","tag":"greeting"}"#,
738            ))
739            .unwrap();
740        engine
741            .put_object(MemoryObject::new(
742                "docs",
743                "c",
744                r#"{"text":"goodbye world","tag":"farewell"}"#,
745            ))
746            .unwrap();
747
748        let all = engine.search("world", SearchOptions::default()).unwrap();
749        assert_eq!(all.len(), 2);
750
751        let limited = engine
752            .search(
753                "world",
754                SearchOptions {
755                    limit: Some(1),
756                    ..Default::default()
757                },
758            )
759            .unwrap();
760        assert_eq!(limited.len(), 1);
761
762        let filtered = engine
763            .search(
764                "hello",
765                SearchOptions {
766                    collections: Some(vec!["docs".into()]),
767                    ..Default::default()
768                },
769            )
770            .unwrap();
771        assert_eq!(filtered.len(), 2);
772    }
773}