Skip to main content

thingd_core/
in_memory.rs

1//! In-memory storage adapter used for API design and tests.
2
3use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::{ListEventsOptions, ListObjectsOptions};
6use crate::{
7    EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8    ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9    QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12/// In-memory engine used to prove the storage boundary.
13///
14/// # Examples
15///
16/// ```rust
17/// use thingd_core::{MemoryEngine, ObjectStore, EventLog, MemoryObject, MemoryEvent};
18///
19/// let mut engine = MemoryEngine::new();
20///
21/// engine.put_object(MemoryObject::new("users", "alice", r#"{"name":"Alice"}"#)).unwrap();
22/// engine.append_event(MemoryEvent::new("audit", "login", r#"{"user":"alice"}"#)).unwrap();
23///
24/// assert_eq!(engine.count_objects().unwrap(), 1);
25/// assert_eq!(engine.count_events().unwrap(), 1);
26/// ```
27#[derive(Default)]
28pub struct MemoryEngine {
29    objects: BTreeMap<ObjectKey, MemoryObject>,
30    events: Vec<MemoryEvent>,
31    queues: BTreeMap<String, VecDeque<QueueJob>>,
32    links: Vec<Link>,
33    next_event_sequence: u64,
34    next_link_id: u64,
35}
36
37impl MemoryEngine {
38    /// Create a new empty in-memory engine.
39    pub fn new() -> Self {
40        Self::default()
41    }
42}
43
44impl ObjectStore for MemoryEngine {
45    fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
46        let now = now_iso_string();
47        let version = self
48            .objects
49            .get(&object.key)
50            .map_or(1, |existing| existing.version + 1);
51
52        object.version = version;
53        object.updated_at.clone_from(&now);
54        if object.created_at.is_empty() {
55            object.created_at = now;
56        }
57        self.objects.insert(object.key.clone(), object.clone());
58
59        Ok(object)
60    }
61
62    fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
63        Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
64    }
65
66    fn list_objects(
67        &self,
68        collections: Option<&[String]>,
69        options: &ListObjectsOptions,
70    ) -> ThingdResult<Vec<MemoryObject>> {
71        let mut objects: Vec<MemoryObject> =
72            self.objects
73                .values()
74                .filter(|object| {
75                    collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
76                })
77                .filter(|object| {
78                    if options.filter.is_empty() {
79                        return true;
80                    }
81                    let Ok(body) = serde_json::from_str::<serde_json::Value>(&object.body) else {
82                        return false;
83                    };
84                    options.filter.iter().all(|(key, expected)| {
85                        body.get(key.as_str()).is_some_and(|v| v == expected)
86                    })
87                })
88                .cloned()
89                .collect();
90
91        // Apply sort if requested
92        if let Some(ref sort_by) = options.sort_by {
93            use crate::model::SortDirection;
94            let asc = sort_by.direction == SortDirection::Asc;
95            objects.sort_by(|a, b| {
96                let cmp = match sort_by.field.as_str() {
97                    "id" => a.key.id.cmp(&b.key.id),
98                    "collection" => a.key.collection.cmp(&b.key.collection),
99                    "created_at" => a.created_at.cmp(&b.created_at),
100                    "updated_at" => a.updated_at.cmp(&b.updated_at),
101                    "version" => a.version.cmp(&b.version),
102                    _ => std::cmp::Ordering::Equal,
103                };
104                if asc { cmp } else { cmp.reverse() }
105            });
106        }
107
108        if let Some(offset) = options.offset {
109            let skip = usize::try_from(offset).unwrap_or(usize::MAX);
110            objects = objects.into_iter().skip(skip).collect();
111        }
112        if let Some(limit) = options.limit {
113            let take = usize::try_from(limit).unwrap_or(usize::MAX);
114            objects.truncate(take);
115        }
116
117        Ok(objects)
118    }
119
120    fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
121        Ok(self
122            .objects
123            .remove(&ObjectKey::new(collection, id))
124            .is_some())
125    }
126
127    fn count_objects(&self) -> ThingdResult<u64> {
128        Ok(self.objects.len() as u64)
129    }
130
131    fn list_collections(&self) -> ThingdResult<Vec<String>> {
132        let mut collections: Vec<String> = self
133            .objects
134            .keys()
135            .map(|key| key.collection.clone())
136            .collect();
137        collections.sort();
138        collections.dedup();
139        Ok(collections)
140    }
141}
142
143impl EventLog for MemoryEngine {
144    fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
145        self.next_event_sequence += 1;
146        event.sequence = self.next_event_sequence;
147        if event.created_at.is_empty() {
148            event.created_at = now_iso_string();
149        }
150        self.events.push(event.clone());
151
152        Ok(event)
153    }
154
155    fn list_events(
156        &self,
157        stream: Option<&str>,
158        options: ListEventsOptions,
159    ) -> ThingdResult<Vec<MemoryEvent>> {
160        let events = self
161            .events
162            .iter()
163            .filter(|event| stream.is_none_or(|target| event.stream == target))
164            .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
165            .cloned()
166            .collect::<Vec<_>>();
167
168        Ok(match options.limit {
169            Some(limit) => events
170                .into_iter()
171                .take(usize::try_from(limit).unwrap_or(usize::MAX))
172                .collect(),
173            None => events,
174        })
175    }
176
177    fn count_events(&self) -> ThingdResult<u64> {
178        Ok(self.events.len() as u64)
179    }
180
181    fn list_streams(&self) -> ThingdResult<Vec<String>> {
182        let mut streams: Vec<String> = self
183            .events
184            .iter()
185            .map(|event| event.stream.clone())
186            .collect();
187        streams.sort();
188        streams.dedup();
189        Ok(streams)
190    }
191}
192
193impl QueueStore for MemoryEngine {
194    fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
195        let jobs = self.queues.entry(job.queue.clone()).or_default();
196
197        if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
198            return Ok(existing.clone());
199        }
200
201        jobs.push_back(job.clone());
202        Ok(job)
203    }
204
205    fn claim_job_with_options(
206        &mut self,
207        queue: &str,
208        options: QueueClaimOptions,
209    ) -> ThingdResult<Option<QueueJob>> {
210        self.release_expired_leases(queue);
211
212        let Some(jobs) = self.queues.get_mut(queue) else {
213            return Ok(None);
214        };
215
216        let now = unix_timestamp_millis();
217        let Some(job) = jobs.iter_mut().find(|candidate| {
218            candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
219        }) else {
220            return Ok(None);
221        };
222
223        job.status = QueueJobStatus::Leased;
224        job.attempts += 1;
225        job.leased_at_ms = Some(now);
226        job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
227
228        Ok(Some(job.clone()))
229    }
230
231    fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
232        let Some(job) = self.find_job_mut(queue, id) else {
233            return Ok(None);
234        };
235
236        if job.status != QueueJobStatus::Leased {
237            return Err(ThingdError::Conflict(format!(
238                "job {id} must be leased before ack"
239            )));
240        }
241
242        job.status = QueueJobStatus::Completed;
243        job.completed_at_ms = Some(unix_timestamp_millis());
244
245        Ok(Some(job.clone()))
246    }
247
248    fn nack_job_with_options(
249        &mut self,
250        queue: &str,
251        id: &str,
252        options: QueueNackOptions,
253    ) -> ThingdResult<Option<QueueJob>> {
254        let Some(job) = self.find_job_mut(queue, id) else {
255            return Ok(None);
256        };
257
258        if job.status != QueueJobStatus::Leased {
259            return Err(ThingdError::Conflict(format!(
260                "job {id} must be leased before nack"
261            )));
262        }
263
264        let now = unix_timestamp_millis();
265        job.leased_at_ms = None;
266        job.lease_expires_at_ms = None;
267        if !options.error.is_empty() {
268            job.last_error = options.error;
269        }
270        job.status = if job.attempts >= job.max_attempts {
271            job.dead_at_ms = Some(now);
272            QueueJobStatus::Dead
273        } else {
274            job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
275            QueueJobStatus::Ready
276        };
277
278        Ok(Some(job.clone()))
279    }
280
281    fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
282        Ok(self
283            .queues
284            .get(queue)
285            .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
286    }
287
288    fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
289        Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
290            jobs.iter()
291                .filter(|job| job.status == QueueJobStatus::Dead)
292                .cloned()
293                .collect()
294        }))
295    }
296
297    fn list_queues(&self) -> ThingdResult<Vec<String>> {
298        let mut queues: Vec<String> = self.queues.keys().cloned().collect();
299        queues.sort();
300        Ok(queues)
301    }
302
303    fn count_active_jobs(&self) -> ThingdResult<u64> {
304        let count = self
305            .queues
306            .values()
307            .flat_map(|jobs| jobs.iter())
308            .filter(|job| job.status != QueueJobStatus::Dead)
309            .count();
310        Ok(count as u64)
311    }
312
313    fn count_dead_jobs(&self) -> ThingdResult<u64> {
314        let count = self
315            .queues
316            .values()
317            .flat_map(|jobs| jobs.iter())
318            .filter(|job| job.status == QueueJobStatus::Dead)
319            .count();
320        Ok(count as u64)
321    }
322}
323
324impl MemoryEngine {
325    fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
326        self.queues
327            .get_mut(queue)?
328            .iter_mut()
329            .find(|job| job.id == id)
330    }
331
332    fn release_expired_leases(&mut self, queue: &str) {
333        let now = unix_timestamp_millis();
334
335        for job in self.queues.get_mut(queue).into_iter().flatten() {
336            if job.status == QueueJobStatus::Leased
337                && job
338                    .lease_expires_at_ms
339                    .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
340            {
341                job.status = QueueJobStatus::Ready;
342                job.leased_at_ms = None;
343                job.lease_expires_at_ms = None;
344            }
345        }
346    }
347}
348
349impl crate::store::Searcher for MemoryEngine {
350    fn search(
351        &self,
352        query: &str,
353        options: crate::SearchOptions,
354    ) -> ThingdResult<Vec<crate::SearchHit>> {
355        let query_words: Vec<String> = query
356            .split_whitespace()
357            .map(|w| {
358                w.to_lowercase()
359                    .chars()
360                    .filter(|c| c.is_alphanumeric())
361                    .collect()
362            })
363            .filter(|w: &String| !w.is_empty())
364            .collect();
365
366        if query_words.is_empty() {
367            return Ok(Vec::new());
368        }
369
370        let mut hits = Vec::new();
371
372        // 1. Search objects
373        for object in self.objects.values() {
374            // Apply collection filter
375            if let Some(ref collections) = options.collections
376                && !collections.contains(&object.key.collection)
377            {
378                continue;
379            }
380
381            // Apply metadata filter
382            if let Some(ref filter) = options.filter
383                && !matches_filter_memory(&object.body, filter)
384            {
385                continue;
386            }
387
388            let text_to_search = format!(
389                "{} {} {}",
390                object.key.collection, object.key.id, object.body
391            )
392            .to_lowercase();
393            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
394
395            if matches_all {
396                hits.push(crate::SearchHit {
397                    kind: "object".to_string(),
398                    collection: object.key.collection.clone(),
399                    id: object.key.id.clone(),
400                    text: object.body.clone(),
401                    score: 1.0,
402                    body: object.body.clone(),
403                    version: Some(object.version),
404                    created_at: object.created_at.clone(),
405                    updated_at: Some(object.updated_at.clone()),
406                    event_type: None,
407                });
408            }
409        }
410
411        // 2. Search events
412        for event in &self.events {
413            // Apply collection filter
414            if let Some(ref collections) = options.collections
415                && !collections.contains(&event.stream)
416            {
417                continue;
418            }
419
420            // Apply metadata filter
421            if let Some(ref filter) = options.filter
422                && !matches_filter_memory(&event.body, filter)
423            {
424                continue;
425            }
426
427            let text_to_search =
428                format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
429            let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
430
431            if matches_all {
432                hits.push(crate::SearchHit {
433                    kind: "event".to_string(),
434                    collection: event.stream.clone(),
435                    id: event.sequence.to_string(),
436                    text: event.body.clone(),
437                    score: 1.0,
438                    body: event.body.clone(),
439                    version: None,
440                    created_at: event.created_at.clone(),
441                    updated_at: None,
442                    event_type: Some(event.event_type.clone()),
443                });
444            }
445        }
446
447        // Limit results if requested
448        if let Some(limit) = options.limit {
449            hits.truncate(limit);
450        }
451
452        Ok(hits)
453    }
454}
455
456impl LinkStore for MemoryEngine {
457    fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
458        self.next_link_id += 1;
459        link.id = format!("link-{}", self.next_link_id);
460        if link.created_at.is_empty() {
461            link.created_at = now_iso_string();
462        }
463        self.links.push(link.clone());
464        Ok(link)
465    }
466
467    fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
468        let len_before = self.links.len();
469        self.links.retain(|l| l.id != id);
470        Ok(self.links.len() < len_before)
471    }
472
473    fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
474        Ok(self.links.iter().find(|l| l.id == id).cloned())
475    }
476
477    fn get_neighbors(
478        &self,
479        reference: &str,
480        direction: LinkDirection,
481        options: LinkQueryOptions,
482    ) -> ThingdResult<Vec<Link>> {
483        let neighbors: Vec<Link> = self
484            .links
485            .iter()
486            .filter(|link| {
487                let matches_direction = match direction {
488                    LinkDirection::Outgoing => link.from_ref == reference,
489                    LinkDirection::Incoming => link.to_ref == reference,
490                    LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
491                };
492                let matches_type = options
493                    .link_type
494                    .as_deref()
495                    .is_none_or(|t| link.link_type == t);
496                matches_direction && matches_type
497            })
498            .cloned()
499            .collect();
500
501        Ok(match options.limit {
502            Some(limit) => neighbors.into_iter().take(limit).collect(),
503            None => neighbors,
504        })
505    }
506
507    fn count_links(&self) -> ThingdResult<u64> {
508        Ok(self.links.len() as u64)
509    }
510}
511
512fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
513    let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
514        return false;
515    };
516
517    let Some(filter_obj) = filter.as_object() else {
518        return true;
519    };
520
521    for (k, v) in filter_obj {
522        if body.get(k) != Some(v) {
523            return false;
524        }
525    }
526    true
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::store::{LinkStore, Searcher};
533    use crate::{Link, ListObjectsOptions, SearchOptions};
534
535    #[test]
536    fn stores_and_reads_objects() {
537        let mut engine = MemoryEngine::new();
538
539        let object = engine
540            .put_object(MemoryObject::new(
541                "decisions",
542                "rust-core",
543                "{\"text\":\"Use Rust\"}",
544            ))
545            .unwrap();
546
547        let stored = engine
548            .get_object("decisions", "rust-core")
549            .unwrap()
550            .unwrap();
551        assert_eq!(object.version, 1);
552        assert_eq!(stored.key.collection, "decisions");
553        assert_eq!(stored.key.id, "rust-core");
554    }
555
556    #[test]
557    fn lists_objects_with_optional_collection_filter() {
558        let mut engine = MemoryEngine::new();
559
560        engine
561            .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
562            .unwrap();
563        engine
564            .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
565            .unwrap();
566
567        let filtered = engine
568            .list_objects(
569                Some(&["decisions".to_string()]),
570                &ListObjectsOptions::default(),
571            )
572            .unwrap();
573
574        assert_eq!(
575            engine
576                .list_objects(None, &ListObjectsOptions::default())
577                .unwrap()
578                .len(),
579            2
580        );
581        assert_eq!(filtered.len(), 1);
582        assert_eq!(filtered[0].key.collection, "decisions");
583    }
584
585    #[test]
586    fn appends_events_with_sequence_numbers() {
587        let mut engine = MemoryEngine::new();
588
589        let event = engine
590            .append_event(MemoryEvent::new(
591                "project:thingd",
592                "decision.made",
593                "MCP-native object storage",
594            ))
595            .unwrap();
596
597        assert_eq!(event.sequence, 1);
598        assert_eq!(
599            engine
600                .list_events(Some("project:thingd"), ListEventsOptions::default())
601                .unwrap()
602                .len(),
603            1
604        );
605    }
606
607    #[test]
608    fn claims_and_acks_queue_jobs() {
609        let mut engine = MemoryEngine::new();
610
611        engine
612            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
613            .unwrap();
614
615        let claimed = engine.claim_job("embed").unwrap().unwrap();
616        let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
617
618        assert_eq!(claimed.status, QueueJobStatus::Leased);
619        assert_eq!(claimed.attempts, 1);
620        assert_eq!(acked.status, QueueJobStatus::Completed);
621    }
622
623    #[test]
624    fn nacks_jobs_to_dead_letter_after_max_attempts() {
625        let mut engine = MemoryEngine::new();
626
627        engine
628            .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
629            .unwrap();
630
631        engine.claim_job("embed").unwrap().unwrap();
632        let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
633
634        assert_eq!(nacked.status, QueueJobStatus::Dead);
635        assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
636    }
637
638    #[test]
639    fn does_not_claim_delayed_jobs_before_available() {
640        let mut engine = MemoryEngine::new();
641
642        engine
643            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
644            .unwrap();
645
646        assert!(engine.claim_job("embed").unwrap().is_none());
647    }
648
649    #[test]
650    fn reclaims_jobs_after_lease_expiration() {
651        let mut engine = MemoryEngine::new();
652
653        engine
654            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
655            .unwrap();
656
657        let first = engine
658            .claim_job_with_options("embed", QueueClaimOptions::new(0))
659            .unwrap()
660            .unwrap();
661        let second = engine.claim_job("embed").unwrap().unwrap();
662
663        assert_eq!(first.status, QueueJobStatus::Leased);
664        assert_eq!(second.status, QueueJobStatus::Leased);
665        assert_eq!(second.attempts, 2);
666    }
667
668    #[test]
669    fn nacks_jobs_with_retry_delay() {
670        let mut engine = MemoryEngine::new();
671
672        engine
673            .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
674            .unwrap();
675
676        engine.claim_job("embed").unwrap().unwrap();
677        let retried = engine
678            .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
679            .unwrap()
680            .unwrap();
681
682        assert_eq!(retried.status, QueueJobStatus::Ready);
683        assert!(engine.claim_job("embed").unwrap().is_none());
684    }
685
686    #[test]
687    fn counts_objects_events_and_jobs() {
688        let mut engine = MemoryEngine::new();
689
690        assert_eq!(engine.count_objects().unwrap(), 0);
691        assert_eq!(engine.count_events().unwrap(), 0);
692        assert_eq!(engine.count_active_jobs().unwrap(), 0);
693        assert_eq!(engine.count_dead_jobs().unwrap(), 0);
694
695        engine
696            .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
697            .unwrap();
698        engine
699            .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
700            .unwrap();
701        engine
702            .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
703            .unwrap();
704        assert_eq!(engine.count_objects().unwrap(), 3);
705
706        engine
707            .append_event(MemoryEvent::new("s1", "t1", "e1"))
708            .unwrap();
709        engine
710            .append_event(MemoryEvent::new("s1", "t2", "e2"))
711            .unwrap();
712        engine
713            .append_event(MemoryEvent::new("s2", "t3", "e3"))
714            .unwrap();
715        assert_eq!(engine.count_events().unwrap(), 3);
716
717        engine
718            .push_job(QueueJob::new("work", "j1", "p1", 3))
719            .unwrap();
720        engine
721            .push_job(QueueJob::new("work", "j2", "p2", 3))
722            .unwrap();
723        engine
724            .push_job(QueueJob::new("other", "j3", "p3", 1))
725            .unwrap();
726        assert_eq!(engine.count_active_jobs().unwrap(), 3);
727
728        engine.claim_job("other").unwrap();
729        engine.nack_job("other", "j3").unwrap();
730        assert_eq!(engine.count_dead_jobs().unwrap(), 1);
731        assert_eq!(engine.count_active_jobs().unwrap(), 2);
732    }
733
734    #[test]
735    fn lists_collections_streams_and_queues() {
736        let mut engine = MemoryEngine::new();
737
738        assert!(engine.list_collections().unwrap().is_empty());
739        assert!(engine.list_streams().unwrap().is_empty());
740        assert!(engine.list_queues().unwrap().is_empty());
741
742        engine
743            .put_object(MemoryObject::new("col-a", "x", "{}"))
744            .unwrap();
745        engine
746            .put_object(MemoryObject::new("col-b", "y", "{}"))
747            .unwrap();
748        engine
749            .put_object(MemoryObject::new("col-a", "z", "{}"))
750            .unwrap();
751        let collections = engine.list_collections().unwrap();
752        assert_eq!(collections, vec!["col-a", "col-b"]);
753
754        engine
755            .append_event(MemoryEvent::new("s1", "t", "e1"))
756            .unwrap();
757        engine
758            .append_event(MemoryEvent::new("s2", "t", "e2"))
759            .unwrap();
760        let streams = engine.list_streams().unwrap();
761        assert_eq!(streams, vec!["s1", "s2"]);
762
763        engine
764            .push_job(QueueJob::new("work", "j1", "p1", 3))
765            .unwrap();
766        engine
767            .push_job(QueueJob::new("jobs", "j2", "p2", 3))
768            .unwrap();
769        let queues = engine.list_queues().unwrap();
770        assert_eq!(queues, vec!["jobs", "work"]);
771    }
772
773    #[test]
774    fn search_respects_filter_and_limit() {
775        let mut engine = MemoryEngine::new();
776
777        engine
778            .put_object(MemoryObject::new(
779                "docs",
780                "a",
781                r#"{"text":"hello world","tag":"greeting"}"#,
782            ))
783            .unwrap();
784        engine
785            .put_object(MemoryObject::new(
786                "docs",
787                "b",
788                r#"{"text":"hello there","tag":"greeting"}"#,
789            ))
790            .unwrap();
791        engine
792            .put_object(MemoryObject::new(
793                "docs",
794                "c",
795                r#"{"text":"goodbye world","tag":"farewell"}"#,
796            ))
797            .unwrap();
798
799        let all = engine.search("world", SearchOptions::default()).unwrap();
800        assert_eq!(all.len(), 2);
801
802        let limited = engine
803            .search(
804                "world",
805                SearchOptions {
806                    limit: Some(1),
807                    ..Default::default()
808                },
809            )
810            .unwrap();
811        assert_eq!(limited.len(), 1);
812
813        let filtered = engine
814            .search(
815                "hello",
816                SearchOptions {
817                    collections: Some(vec!["docs".into()]),
818                    ..Default::default()
819                },
820            )
821            .unwrap();
822        assert_eq!(filtered.len(), 2);
823    }
824
825    // ── list_objects: filter / limit / offset ─────────────────────────────
826
827    #[test]
828    fn list_objects_filter_returns_matching_objects() {
829        let mut engine = MemoryEngine::new();
830        engine
831            .put_object(MemoryObject::new("w", "a", r#"{"color":"red","size":1}"#))
832            .unwrap();
833        engine
834            .put_object(MemoryObject::new("w", "b", r#"{"color":"blue","size":2}"#))
835            .unwrap();
836        engine
837            .put_object(MemoryObject::new("w", "c", r#"{"color":"red","size":3}"#))
838            .unwrap();
839
840        let opts = ListObjectsOptions {
841            filter: vec![("color".into(), serde_json::json!("red"))],
842            ..Default::default()
843        };
844        let results = engine
845            .list_objects(Some(&["w".to_string()]), &opts)
846            .unwrap();
847        assert_eq!(results.len(), 2);
848        assert!(results.iter().all(|o| o.body.contains("\"red\"")));
849    }
850
851    #[test]
852    fn list_objects_filter_no_match_returns_empty() {
853        let mut engine = MemoryEngine::new();
854        engine
855            .put_object(MemoryObject::new("w", "a", r#"{"color":"red"}"#))
856            .unwrap();
857
858        let opts = ListObjectsOptions {
859            filter: vec![("color".into(), serde_json::json!("green"))],
860            ..Default::default()
861        };
862        let results = engine
863            .list_objects(Some(&["w".to_string()]), &opts)
864            .unwrap();
865        assert!(results.is_empty());
866    }
867
868    #[test]
869    fn list_objects_limit_truncates_results() {
870        let mut engine = MemoryEngine::new();
871        for i in 0..5u32 {
872            engine
873                .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
874                .unwrap();
875        }
876
877        let opts = ListObjectsOptions {
878            limit: Some(3),
879            ..Default::default()
880        };
881        let results = engine
882            .list_objects(Some(&["col".to_string()]), &opts)
883            .unwrap();
884        assert_eq!(results.len(), 3);
885    }
886
887    #[test]
888    fn list_objects_offset_skips_results() {
889        let mut engine = MemoryEngine::new();
890        for i in 0..5u32 {
891            engine
892                .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
893                .unwrap();
894        }
895
896        let opts = ListObjectsOptions {
897            offset: Some(3),
898            ..Default::default()
899        };
900        let results = engine
901            .list_objects(Some(&["col".to_string()]), &opts)
902            .unwrap();
903        assert_eq!(results.len(), 2);
904    }
905
906    #[test]
907    fn list_objects_filter_and_limit_combined() {
908        let mut engine = MemoryEngine::new();
909        for i in 0..4u32 {
910            engine
911                .put_object(MemoryObject::new(
912                    "col",
913                    format!("id-{i}"),
914                    r#"{"status":"active"}"#,
915                ))
916                .unwrap();
917        }
918        engine
919            .put_object(MemoryObject::new("col", "id-4", r#"{"status":"inactive"}"#))
920            .unwrap();
921
922        let opts = ListObjectsOptions {
923            filter: vec![("status".into(), serde_json::json!("active"))],
924            limit: Some(2),
925            ..Default::default()
926        };
927        let results = engine
928            .list_objects(Some(&["col".to_string()]), &opts)
929            .unwrap();
930        assert_eq!(results.len(), 2);
931        assert!(results.iter().all(|o| o.body.contains("active")));
932    }
933
934    // ── append_event: RETURNING gives correct sequence + created_at ────────
935
936    #[test]
937    fn append_event_returns_sequence_and_timestamp() {
938        let mut engine = MemoryEngine::new();
939
940        let first = engine
941            .append_event(MemoryEvent::new("s", "ev.first", r#"{"x":1}"#))
942            .unwrap();
943        let second = engine
944            .append_event(MemoryEvent::new("s", "ev.second", r#"{"x":2}"#))
945            .unwrap();
946
947        assert_eq!(first.sequence, 1);
948        assert_eq!(second.sequence, 2);
949        assert!(!first.created_at.is_empty(), "created_at should be set");
950    }
951
952    // ── create_link: monotonic IDs survive deletes ─────────────────────────
953
954    #[test]
955    fn create_link_ids_are_unique_after_delete() {
956        let mut engine = MemoryEngine::new();
957        engine
958            .put_object(MemoryObject::new("n", "a", "{}"))
959            .unwrap();
960        engine
961            .put_object(MemoryObject::new("n", "b", "{}"))
962            .unwrap();
963        engine
964            .put_object(MemoryObject::new("n", "c", "{}"))
965            .unwrap();
966
967        let l1 = engine
968            .create_link(Link::new("n/a", "connects", "n/b"))
969            .unwrap();
970        let l2 = engine
971            .create_link(Link::new("n/b", "connects", "n/c"))
972            .unwrap();
973
974        // Delete the first link — the ID counter must NOT reset.
975        engine.delete_link(&l1.id).unwrap();
976
977        let l3 = engine
978            .create_link(Link::new("n/a", "connects", "n/c"))
979            .unwrap();
980
981        assert_ne!(l3.id, l2.id, "IDs must not collide after a delete");
982        assert!(l3.id > l2.id, "IDs must be monotonically increasing");
983    }
984
985    // ── list_objects: sort by created_at DESC ──────────────────────────────
986
987    #[test]
988    fn list_objects_sort_by_created_at_desc() {
989        let mut engine = MemoryEngine::new();
990        engine
991            .put_object(MemoryObject::new("w", "a", r#"{"x":1}"#))
992            .unwrap();
993        engine
994            .put_object(MemoryObject::new("w", "b", r#"{"x":2}"#))
995            .unwrap();
996        engine
997            .put_object(MemoryObject::new("w", "c", r#"{"x":3}"#))
998            .unwrap();
999
1000        let opts = ListObjectsOptions {
1001            sort_by: Some(crate::SortBy::desc("created_at")),
1002            ..Default::default()
1003        };
1004        let results = engine
1005            .list_objects(Some(&["w".to_string()]), &opts)
1006            .unwrap();
1007        assert_eq!(results.len(), 3);
1008    }
1009
1010    // ── list_objects: sort by id ASC ───────────────────────────────────────
1011
1012    #[test]
1013    fn list_objects_sort_by_id_asc() {
1014        let mut engine = MemoryEngine::new();
1015        engine
1016            .put_object(MemoryObject::new("w", "c", r#"{"x":3}"#))
1017            .unwrap();
1018        engine
1019            .put_object(MemoryObject::new("w", "a", r#"{"x":1}"#))
1020            .unwrap();
1021        engine
1022            .put_object(MemoryObject::new("w", "b", r#"{"x":2}"#))
1023            .unwrap();
1024
1025        let opts = ListObjectsOptions {
1026            sort_by: Some(crate::SortBy::asc("id")),
1027            ..Default::default()
1028        };
1029        let results = engine
1030            .list_objects(Some(&["w".to_string()]), &opts)
1031            .unwrap();
1032        assert_eq!(results.len(), 3);
1033        assert_eq!(results[0].key.id, "a");
1034        assert_eq!(results[1].key.id, "b");
1035        assert_eq!(results[2].key.id, "c");
1036    }
1037
1038    // ── delete_objects_batch ───────────────────────────────────────────────
1039
1040    #[test]
1041    fn delete_objects_batch_deletes_multiple() {
1042        let mut engine = MemoryEngine::new();
1043        engine
1044            .put_object(MemoryObject::new("w", "a", "{}"))
1045            .unwrap();
1046        engine
1047            .put_object(MemoryObject::new("w", "b", "{}"))
1048            .unwrap();
1049        engine
1050            .put_object(MemoryObject::new("w", "c", "{}"))
1051            .unwrap();
1052
1053        let keys = vec![
1054            ("w".to_string(), "a".to_string()),
1055            ("w".to_string(), "b".to_string()),
1056        ];
1057        let deleted = engine.delete_objects_batch(&keys).unwrap();
1058        assert_eq!(deleted, 2);
1059        assert_eq!(engine.count_objects().unwrap(), 1);
1060        assert!(engine.get_object("w", "a").unwrap().is_none());
1061        assert!(engine.get_object("w", "b").unwrap().is_none());
1062        assert!(engine.get_object("w", "c").unwrap().is_some());
1063    }
1064
1065    // ── put_object_with_options: index=false ───────────────────────────────
1066
1067    #[test]
1068    fn put_object_with_options_skip_index() {
1069        let mut engine = MemoryEngine::new();
1070        let opts = crate::PutObjectOptions { index: false };
1071
1072        engine
1073            .put_object_with_options(MemoryObject::new("w", "a", r#"{"text":"hello"}"#), opts)
1074            .unwrap();
1075
1076        let obj = engine.get_object("w", "a").unwrap();
1077        assert!(obj.is_some());
1078        assert_eq!(obj.unwrap().body, r#"{"text":"hello"}"#);
1079    }
1080}