Skip to main content

thingd_core/
in_memory.rs

1//! In-memory storage adapter used for API design and tests.
2
3use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::{ListEventsOptions, ListObjectsOptions};
6use crate::{
7    EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8    ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9    QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12/// In-memory engine used to prove the storage boundary.
13///
14/// # Examples
15///
16/// ```rust
17/// use thingd_core::{MemoryEngine, ObjectStore, EventLog, MemoryObject, MemoryEvent};
18///
19/// let mut engine = MemoryEngine::new();
20///
21/// engine.put_object(MemoryObject::new("users", "alice", r#"{"name":"Alice"}"#)).unwrap();
22/// engine.append_event(MemoryEvent::new("audit", "login", r#"{"user":"alice"}"#)).unwrap();
23///
24/// assert_eq!(engine.count_objects().unwrap(), 1);
25/// assert_eq!(engine.count_events().unwrap(), 1);
26/// ```
27#[derive(Default)]
28pub struct MemoryEngine {
29    objects: BTreeMap<ObjectKey, MemoryObject>,
30    events: Vec<MemoryEvent>,
31    queues: BTreeMap<String, VecDeque<QueueJob>>,
32    links: Vec<Link>,
33    next_event_sequence: u64,
34    next_link_id: u64,
35}
36
37impl MemoryEngine {
38    /// Create a new empty in-memory engine.
39    pub fn new() -> Self {
40        Self::default()
41    }
42}
43
44impl ObjectStore for MemoryEngine {
45    fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
46        let now = now_iso_string();
47        let version = self
48            .objects
49            .get(&object.key)
50            .map_or(1, |existing| existing.version + 1);
51
52        object.version = version;
53        object.updated_at.clone_from(&now);
54        if object.created_at.is_empty() {
55            object.created_at = now;
56        }
57        self.objects.insert(object.key.clone(), object.clone());
58
59        Ok(object)
60    }
61
62    fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
63        Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
64    }
65
66    fn list_objects(
67        &self,
68        collections: Option<&[String]>,
69        options: &ListObjectsOptions,
70    ) -> ThingdResult<Vec<MemoryObject>> {
71        let mut objects: Vec<MemoryObject> =
72            self.objects
73                .values()
74                .filter(|object| {
75                    collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
76                })
77                .filter(|object| {
78                    if options.filter.is_empty() {
79                        return true;
80                    }
81                    let Ok(body) = serde_json::from_str::<serde_json::Value>(&object.body) else {
82                        return false;
83                    };
84                    options.filter.iter().all(|(key, expected)| {
85                        body.get(key.as_str()).is_some_and(|v| v == expected)
86                    })
87                })
88                .cloned()
89                .collect();
90
91        if let Some(offset) = options.offset {
92            let skip = usize::try_from(offset).unwrap_or(usize::MAX);
93            objects = objects.into_iter().skip(skip).collect();
94        }
95        if let Some(limit) = options.limit {
96            let take = usize::try_from(limit).unwrap_or(usize::MAX);
97            objects.truncate(take);
98        }
99
100        Ok(objects)
101    }
102
103    fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
104        Ok(self
105            .objects
106            .remove(&ObjectKey::new(collection, id))
107            .is_some())
108    }
109
110    fn count_objects(&self) -> ThingdResult<u64> {
111        Ok(self.objects.len() as u64)
112    }
113
114    fn list_collections(&self) -> ThingdResult<Vec<String>> {
115        let mut collections: Vec<String> = self
116            .objects
117            .keys()
118            .map(|key| key.collection.clone())
119            .collect();
120        collections.sort();
121        collections.dedup();
122        Ok(collections)
123    }
124}
125
126impl EventLog for MemoryEngine {
127    fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
128        self.next_event_sequence += 1;
129        event.sequence = self.next_event_sequence;
130        if event.created_at.is_empty() {
131            event.created_at = now_iso_string();
132        }
133        self.events.push(event.clone());
134
135        Ok(event)
136    }
137
138    fn list_events(
139        &self,
140        stream: Option<&str>,
141        options: ListEventsOptions,
142    ) -> ThingdResult<Vec<MemoryEvent>> {
143        let events = self
144            .events
145            .iter()
146            .filter(|event| stream.is_none_or(|target| event.stream == target))
147            .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
148            .cloned()
149            .collect::<Vec<_>>();
150
151        Ok(match options.limit {
152            Some(limit) => events
153                .into_iter()
154                .take(usize::try_from(limit).unwrap_or(usize::MAX))
155                .collect(),
156            None => events,
157        })
158    }
159
160    fn count_events(&self) -> ThingdResult<u64> {
161        Ok(self.events.len() as u64)
162    }
163
164    fn list_streams(&self) -> ThingdResult<Vec<String>> {
165        let mut streams: Vec<String> = self
166            .events
167            .iter()
168            .map(|event| event.stream.clone())
169            .collect();
170        streams.sort();
171        streams.dedup();
172        Ok(streams)
173    }
174}
175
176impl QueueStore for MemoryEngine {
177    fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
178        let jobs = self.queues.entry(job.queue.clone()).or_default();
179
180        if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
181            return Ok(existing.clone());
182        }
183
184        jobs.push_back(job.clone());
185        Ok(job)
186    }
187
188    fn claim_job_with_options(
189        &mut self,
190        queue: &str,
191        options: QueueClaimOptions,
192    ) -> ThingdResult<Option<QueueJob>> {
193        self.release_expired_leases(queue);
194
195        let Some(jobs) = self.queues.get_mut(queue) else {
196            return Ok(None);
197        };
198
199        let now = unix_timestamp_millis();
200        let Some(job) = jobs.iter_mut().find(|candidate| {
201            candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
202        }) else {
203            return Ok(None);
204        };
205
206        job.status = QueueJobStatus::Leased;
207        job.attempts += 1;
208        job.leased_at_ms = Some(now);
209        job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
210
211        Ok(Some(job.clone()))
212    }
213
214    fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
215        let Some(job) = self.find_job_mut(queue, id) else {
216            return Ok(None);
217        };
218
219        if job.status != QueueJobStatus::Leased {
220            return Err(ThingdError::Conflict(format!(
221                "job {id} must be leased before ack"
222            )));
223        }
224
225        job.status = QueueJobStatus::Completed;
226        job.completed_at_ms = Some(unix_timestamp_millis());
227
228        Ok(Some(job.clone()))
229    }
230
231    fn nack_job_with_options(
232        &mut self,
233        queue: &str,
234        id: &str,
235        options: QueueNackOptions,
236    ) -> ThingdResult<Option<QueueJob>> {
237        let Some(job) = self.find_job_mut(queue, id) else {
238            return Ok(None);
239        };
240
241        if job.status != QueueJobStatus::Leased {
242            return Err(ThingdError::Conflict(format!(
243                "job {id} must be leased before nack"
244            )));
245        }
246
247        let now = unix_timestamp_millis();
248        job.leased_at_ms = None;
249        job.lease_expires_at_ms = None;
250        if !options.error.is_empty() {
251            job.last_error = options.error;
252        }
253        job.status = if job.attempts >= job.max_attempts {
254            job.dead_at_ms = Some(now);
255            QueueJobStatus::Dead
256        } else {
257            job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
258            QueueJobStatus::Ready
259        };
260
261        Ok(Some(job.clone()))
262    }
263
264    fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
265        Ok(self
266            .queues
267            .get(queue)
268            .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
269    }
270
271    fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
272        Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
273            jobs.iter()
274                .filter(|job| job.status == QueueJobStatus::Dead)
275                .cloned()
276                .collect()
277        }))
278    }
279
280    fn list_queues(&self) -> ThingdResult<Vec<String>> {
281        let mut queues: Vec<String> = self.queues.keys().cloned().collect();
282        queues.sort();
283        Ok(queues)
284    }
285
286    fn count_active_jobs(&self) -> ThingdResult<u64> {
287        let count = self
288            .queues
289            .values()
290            .flat_map(|jobs| jobs.iter())
291            .filter(|job| job.status != QueueJobStatus::Dead)
292            .count();
293        Ok(count as u64)
294    }
295
296    fn count_dead_jobs(&self) -> ThingdResult<u64> {
297        let count = self
298            .queues
299            .values()
300            .flat_map(|jobs| jobs.iter())
301            .filter(|job| job.status == QueueJobStatus::Dead)
302            .count();
303        Ok(count as u64)
304    }
305}
306
307impl MemoryEngine {
308    fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
309        self.queues
310            .get_mut(queue)?
311            .iter_mut()
312            .find(|job| job.id == id)
313    }
314
315    fn release_expired_leases(&mut self, queue: &str) {
316        let now = unix_timestamp_millis();
317
318        for job in self.queues.get_mut(queue).into_iter().flatten() {
319            if job.status == QueueJobStatus::Leased
320                && job
321                    .lease_expires_at_ms
322                    .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
323            {
324                job.status = QueueJobStatus::Ready;
325                job.leased_at_ms = None;
326                job.lease_expires_at_ms = None;
327            }
328        }
329    }
330}
331
332impl crate::store::Searcher for MemoryEngine {
333    fn search(
334        &self,
335        query: &str,
336        options: crate::SearchOptions,
337    ) -> ThingdResult<Vec<crate::SearchHit>> {
338        let query_words: Vec<String> = query
339            .split_whitespace()
340            .map(|w| {
341                w.to_lowercase()
342                    .chars()
343                    .filter(|c| c.is_alphanumeric())
344                    .collect()
345            })
346            .filter(|w: &String| !w.is_empty())
347            .collect();
348
349        if query_words.is_empty() {
350            return Ok(Vec::new());
351        }
352
353        let mut hits = Vec::new();
354
355        // 1. Search objects
356        for object in self.objects.values() {
357            // Apply collection filter
358            if let Some(ref collections) = options.collections
359                && !collections.contains(&object.key.collection)
360            {
361                continue;
362            }
363
364            // Apply metadata filter
365            if let Some(ref filter) = options.filter
366                && !matches_filter_memory(&object.body, filter)
367            {
368                continue;
369            }
370
371            let text_to_search = format!(
372                "{} {} {}",
373                object.key.collection, object.key.id, object.body
374            )
375            .to_lowercase();
376            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
377
378            if matches_all {
379                hits.push(crate::SearchHit {
380                    kind: "object".to_string(),
381                    collection: object.key.collection.clone(),
382                    id: object.key.id.clone(),
383                    text: object.body.clone(),
384                    score: 1.0,
385                    body: object.body.clone(),
386                    version: Some(object.version),
387                    created_at: object.created_at.clone(),
388                    updated_at: Some(object.updated_at.clone()),
389                    event_type: None,
390                });
391            }
392        }
393
394        // 2. Search events
395        for event in &self.events {
396            // Apply collection filter
397            if let Some(ref collections) = options.collections
398                && !collections.contains(&event.stream)
399            {
400                continue;
401            }
402
403            // Apply metadata filter
404            if let Some(ref filter) = options.filter
405                && !matches_filter_memory(&event.body, filter)
406            {
407                continue;
408            }
409
410            let text_to_search =
411                format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
412            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
413
414            if matches_all {
415                hits.push(crate::SearchHit {
416                    kind: "event".to_string(),
417                    collection: event.stream.clone(),
418                    id: event.sequence.to_string(),
419                    text: event.body.clone(),
420                    score: 1.0,
421                    body: event.body.clone(),
422                    version: None,
423                    created_at: event.created_at.clone(),
424                    updated_at: None,
425                    event_type: Some(event.event_type.clone()),
426                });
427            }
428        }
429
430        // Limit results if requested
431        if let Some(limit) = options.limit {
432            hits.truncate(limit);
433        }
434
435        Ok(hits)
436    }
437}
438
439impl LinkStore for MemoryEngine {
440    fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
441        self.next_link_id += 1;
442        link.id = format!("link-{}", self.next_link_id);
443        if link.created_at.is_empty() {
444            link.created_at = now_iso_string();
445        }
446        self.links.push(link.clone());
447        Ok(link)
448    }
449
450    fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
451        let len_before = self.links.len();
452        self.links.retain(|l| l.id != id);
453        Ok(self.links.len() < len_before)
454    }
455
456    fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
457        Ok(self.links.iter().find(|l| l.id == id).cloned())
458    }
459
460    fn get_neighbors(
461        &self,
462        reference: &str,
463        direction: LinkDirection,
464        options: LinkQueryOptions,
465    ) -> ThingdResult<Vec<Link>> {
466        let neighbors: Vec<Link> = self
467            .links
468            .iter()
469            .filter(|link| {
470                let matches_direction = match direction {
471                    LinkDirection::Outgoing => link.from_ref == reference,
472                    LinkDirection::Incoming => link.to_ref == reference,
473                    LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
474                };
475                let matches_type = options
476                    .link_type
477                    .as_deref()
478                    .is_none_or(|t| link.link_type == t);
479                matches_direction && matches_type
480            })
481            .cloned()
482            .collect();
483
484        Ok(match options.limit {
485            Some(limit) => neighbors.into_iter().take(limit).collect(),
486            None => neighbors,
487        })
488    }
489
490    fn count_links(&self) -> ThingdResult<u64> {
491        Ok(self.links.len() as u64)
492    }
493}
494
495fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
496    let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
497        return false;
498    };
499
500    let Some(filter_obj) = filter.as_object() else {
501        return true;
502    };
503
504    for (k, v) in filter_obj {
505        if body.get(k) != Some(v) {
506            return false;
507        }
508    }
509    true
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use crate::store::{LinkStore, Searcher};
516    use crate::{Link, ListObjectsOptions, SearchOptions};
517
518    #[test]
519    fn stores_and_reads_objects() {
520        let mut engine = MemoryEngine::new();
521
522        let object = engine
523            .put_object(MemoryObject::new(
524                "decisions",
525                "rust-core",
526                "{\"text\":\"Use Rust\"}",
527            ))
528            .unwrap();
529
530        let stored = engine
531            .get_object("decisions", "rust-core")
532            .unwrap()
533            .unwrap();
534        assert_eq!(object.version, 1);
535        assert_eq!(stored.key.collection, "decisions");
536        assert_eq!(stored.key.id, "rust-core");
537    }
538
539    #[test]
540    fn lists_objects_with_optional_collection_filter() {
541        let mut engine = MemoryEngine::new();
542
543        engine
544            .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
545            .unwrap();
546        engine
547            .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
548            .unwrap();
549
550        let filtered = engine
551            .list_objects(
552                Some(&["decisions".to_string()]),
553                &ListObjectsOptions::default(),
554            )
555            .unwrap();
556
557        assert_eq!(
558            engine
559                .list_objects(None, &ListObjectsOptions::default())
560                .unwrap()
561                .len(),
562            2
563        );
564        assert_eq!(filtered.len(), 1);
565        assert_eq!(filtered[0].key.collection, "decisions");
566    }
567
568    #[test]
569    fn appends_events_with_sequence_numbers() {
570        let mut engine = MemoryEngine::new();
571
572        let event = engine
573            .append_event(MemoryEvent::new(
574                "project:thingd",
575                "decision.made",
576                "MCP-native object storage",
577            ))
578            .unwrap();
579
580        assert_eq!(event.sequence, 1);
581        assert_eq!(
582            engine
583                .list_events(Some("project:thingd"), ListEventsOptions::default())
584                .unwrap()
585                .len(),
586            1
587        );
588    }
589
590    #[test]
591    fn claims_and_acks_queue_jobs() {
592        let mut engine = MemoryEngine::new();
593
594        engine
595            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
596            .unwrap();
597
598        let claimed = engine.claim_job("embed").unwrap().unwrap();
599        let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
600
601        assert_eq!(claimed.status, QueueJobStatus::Leased);
602        assert_eq!(claimed.attempts, 1);
603        assert_eq!(acked.status, QueueJobStatus::Completed);
604    }
605
606    #[test]
607    fn nacks_jobs_to_dead_letter_after_max_attempts() {
608        let mut engine = MemoryEngine::new();
609
610        engine
611            .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
612            .unwrap();
613
614        engine.claim_job("embed").unwrap().unwrap();
615        let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
616
617        assert_eq!(nacked.status, QueueJobStatus::Dead);
618        assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
619    }
620
621    #[test]
622    fn does_not_claim_delayed_jobs_before_available() {
623        let mut engine = MemoryEngine::new();
624
625        engine
626            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
627            .unwrap();
628
629        assert!(engine.claim_job("embed").unwrap().is_none());
630    }
631
632    #[test]
633    fn reclaims_jobs_after_lease_expiration() {
634        let mut engine = MemoryEngine::new();
635
636        engine
637            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
638            .unwrap();
639
640        let first = engine
641            .claim_job_with_options("embed", QueueClaimOptions::new(0))
642            .unwrap()
643            .unwrap();
644        let second = engine.claim_job("embed").unwrap().unwrap();
645
646        assert_eq!(first.status, QueueJobStatus::Leased);
647        assert_eq!(second.status, QueueJobStatus::Leased);
648        assert_eq!(second.attempts, 2);
649    }
650
651    #[test]
652    fn nacks_jobs_with_retry_delay() {
653        let mut engine = MemoryEngine::new();
654
655        engine
656            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
657            .unwrap();
658
659        engine.claim_job("embed").unwrap().unwrap();
660        let retried = engine
661            .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
662            .unwrap()
663            .unwrap();
664
665        assert_eq!(retried.status, QueueJobStatus::Ready);
666        assert!(engine.claim_job("embed").unwrap().is_none());
667    }
668
669    #[test]
670    fn counts_objects_events_and_jobs() {
671        let mut engine = MemoryEngine::new();
672
673        assert_eq!(engine.count_objects().unwrap(), 0);
674        assert_eq!(engine.count_events().unwrap(), 0);
675        assert_eq!(engine.count_active_jobs().unwrap(), 0);
676        assert_eq!(engine.count_dead_jobs().unwrap(), 0);
677
678        engine
679            .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
680            .unwrap();
681        engine
682            .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
683            .unwrap();
684        engine
685            .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
686            .unwrap();
687        assert_eq!(engine.count_objects().unwrap(), 3);
688
689        engine
690            .append_event(MemoryEvent::new("s1", "t1", "e1"))
691            .unwrap();
692        engine
693            .append_event(MemoryEvent::new("s1", "t2", "e2"))
694            .unwrap();
695        engine
696            .append_event(MemoryEvent::new("s2", "t3", "e3"))
697            .unwrap();
698        assert_eq!(engine.count_events().unwrap(), 3);
699
700        engine
701            .push_job(QueueJob::new("work", "j1", "p1", 3))
702            .unwrap();
703        engine
704            .push_job(QueueJob::new("work", "j2", "p2", 3))
705            .unwrap();
706        engine
707            .push_job(QueueJob::new("other", "j3", "p3", 1))
708            .unwrap();
709        assert_eq!(engine.count_active_jobs().unwrap(), 3);
710
711        engine.claim_job("other").unwrap();
712        engine.nack_job("other", "j3").unwrap();
713        assert_eq!(engine.count_dead_jobs().unwrap(), 1);
714        assert_eq!(engine.count_active_jobs().unwrap(), 2);
715    }
716
717    #[test]
718    fn lists_collections_streams_and_queues() {
719        let mut engine = MemoryEngine::new();
720
721        assert!(engine.list_collections().unwrap().is_empty());
722        assert!(engine.list_streams().unwrap().is_empty());
723        assert!(engine.list_queues().unwrap().is_empty());
724
725        engine
726            .put_object(MemoryObject::new("col-a", "x", "{}"))
727            .unwrap();
728        engine
729            .put_object(MemoryObject::new("col-b", "y", "{}"))
730            .unwrap();
731        engine
732            .put_object(MemoryObject::new("col-a", "z", "{}"))
733            .unwrap();
734        let collections = engine.list_collections().unwrap();
735        assert_eq!(collections, vec!["col-a", "col-b"]);
736
737        engine
738            .append_event(MemoryEvent::new("s1", "t", "e1"))
739            .unwrap();
740        engine
741            .append_event(MemoryEvent::new("s2", "t", "e2"))
742            .unwrap();
743        let streams = engine.list_streams().unwrap();
744        assert_eq!(streams, vec!["s1", "s2"]);
745
746        engine
747            .push_job(QueueJob::new("work", "j1", "p1", 3))
748            .unwrap();
749        engine
750            .push_job(QueueJob::new("jobs", "j2", "p2", 3))
751            .unwrap();
752        let queues = engine.list_queues().unwrap();
753        assert_eq!(queues, vec!["jobs", "work"]);
754    }
755
756    #[test]
757    fn search_respects_filter_and_limit() {
758        let mut engine = MemoryEngine::new();
759
760        engine
761            .put_object(MemoryObject::new(
762                "docs",
763                "a",
764                r#"{"text":"hello world","tag":"greeting"}"#,
765            ))
766            .unwrap();
767        engine
768            .put_object(MemoryObject::new(
769                "docs",
770                "b",
771                r#"{"text":"hello there","tag":"greeting"}"#,
772            ))
773            .unwrap();
774        engine
775            .put_object(MemoryObject::new(
776                "docs",
777                "c",
778                r#"{"text":"goodbye world","tag":"farewell"}"#,
779            ))
780            .unwrap();
781
782        let all = engine.search("world", SearchOptions::default()).unwrap();
783        assert_eq!(all.len(), 2);
784
785        let limited = engine
786            .search(
787                "world",
788                SearchOptions {
789                    limit: Some(1),
790                    ..Default::default()
791                },
792            )
793            .unwrap();
794        assert_eq!(limited.len(), 1);
795
796        let filtered = engine
797            .search(
798                "hello",
799                SearchOptions {
800                    collections: Some(vec!["docs".into()]),
801                    ..Default::default()
802                },
803            )
804            .unwrap();
805        assert_eq!(filtered.len(), 2);
806    }
807
808    // ── list_objects: filter / limit / offset ─────────────────────────────
809
810    #[test]
811    fn list_objects_filter_returns_matching_objects() {
812        let mut engine = MemoryEngine::new();
813        engine
814            .put_object(MemoryObject::new("w", "a", r#"{"color":"red","size":1}"#))
815            .unwrap();
816        engine
817            .put_object(MemoryObject::new("w", "b", r#"{"color":"blue","size":2}"#))
818            .unwrap();
819        engine
820            .put_object(MemoryObject::new("w", "c", r#"{"color":"red","size":3}"#))
821            .unwrap();
822
823        let opts = ListObjectsOptions {
824            filter: vec![("color".into(), serde_json::json!("red"))],
825            ..Default::default()
826        };
827        let results = engine
828            .list_objects(Some(&["w".to_string()]), &opts)
829            .unwrap();
830        assert_eq!(results.len(), 2);
831        assert!(results.iter().all(|o| o.body.contains("\"red\"")));
832    }
833
834    #[test]
835    fn list_objects_filter_no_match_returns_empty() {
836        let mut engine = MemoryEngine::new();
837        engine
838            .put_object(MemoryObject::new("w", "a", r#"{"color":"red"}"#))
839            .unwrap();
840
841        let opts = ListObjectsOptions {
842            filter: vec![("color".into(), serde_json::json!("green"))],
843            ..Default::default()
844        };
845        let results = engine
846            .list_objects(Some(&["w".to_string()]), &opts)
847            .unwrap();
848        assert!(results.is_empty());
849    }
850
851    #[test]
852    fn list_objects_limit_truncates_results() {
853        let mut engine = MemoryEngine::new();
854        for i in 0..5u32 {
855            engine
856                .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
857                .unwrap();
858        }
859
860        let opts = ListObjectsOptions {
861            limit: Some(3),
862            ..Default::default()
863        };
864        let results = engine
865            .list_objects(Some(&["col".to_string()]), &opts)
866            .unwrap();
867        assert_eq!(results.len(), 3);
868    }
869
870    #[test]
871    fn list_objects_offset_skips_results() {
872        let mut engine = MemoryEngine::new();
873        for i in 0..5u32 {
874            engine
875                .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
876                .unwrap();
877        }
878
879        let opts = ListObjectsOptions {
880            offset: Some(3),
881            ..Default::default()
882        };
883        let results = engine
884            .list_objects(Some(&["col".to_string()]), &opts)
885            .unwrap();
886        assert_eq!(results.len(), 2);
887    }
888
889    #[test]
890    fn list_objects_filter_and_limit_combined() {
891        let mut engine = MemoryEngine::new();
892        for i in 0..4u32 {
893            engine
894                .put_object(MemoryObject::new(
895                    "col",
896                    format!("id-{i}"),
897                    r#"{"status":"active"}"#,
898                ))
899                .unwrap();
900        }
901        engine
902            .put_object(MemoryObject::new("col", "id-4", r#"{"status":"inactive"}"#))
903            .unwrap();
904
905        let opts = ListObjectsOptions {
906            filter: vec![("status".into(), serde_json::json!("active"))],
907            limit: Some(2),
908            ..Default::default()
909        };
910        let results = engine
911            .list_objects(Some(&["col".to_string()]), &opts)
912            .unwrap();
913        assert_eq!(results.len(), 2);
914        assert!(results.iter().all(|o| o.body.contains("active")));
915    }
916
917    // ── append_event: RETURNING gives correct sequence + created_at ────────
918
919    #[test]
920    fn append_event_returns_sequence_and_timestamp() {
921        let mut engine = MemoryEngine::new();
922
923        let first = engine
924            .append_event(MemoryEvent::new("s", "ev.first", r#"{"x":1}"#))
925            .unwrap();
926        let second = engine
927            .append_event(MemoryEvent::new("s", "ev.second", r#"{"x":2}"#))
928            .unwrap();
929
930        assert_eq!(first.sequence, 1);
931        assert_eq!(second.sequence, 2);
932        assert!(!first.created_at.is_empty(), "created_at should be set");
933    }
934
935    // ── create_link: monotonic IDs survive deletes ─────────────────────────
936
937    #[test]
938    fn create_link_ids_are_unique_after_delete() {
939        let mut engine = MemoryEngine::new();
940        engine
941            .put_object(MemoryObject::new("n", "a", "{}"))
942            .unwrap();
943        engine
944            .put_object(MemoryObject::new("n", "b", "{}"))
945            .unwrap();
946        engine
947            .put_object(MemoryObject::new("n", "c", "{}"))
948            .unwrap();
949
950        let l1 = engine
951            .create_link(Link::new("n/a", "connects", "n/b"))
952            .unwrap();
953        let l2 = engine
954            .create_link(Link::new("n/b", "connects", "n/c"))
955            .unwrap();
956
957        // Delete the first link — the ID counter must NOT reset.
958        engine.delete_link(&l1.id).unwrap();
959
960        let l3 = engine
961            .create_link(Link::new("n/a", "connects", "n/c"))
962            .unwrap();
963
964        assert_ne!(l3.id, l2.id, "IDs must not collide after a delete");
965        assert!(l3.id > l2.id, "IDs must be monotonically increasing");
966    }
967}