1use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::{ListEventsOptions, ListObjectsOptions};
6use crate::{
7 EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8 ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9 QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12#[derive(Default)]
28pub struct MemoryEngine {
29 objects: BTreeMap<ObjectKey, MemoryObject>,
30 events: Vec<MemoryEvent>,
31 queues: BTreeMap<String, VecDeque<QueueJob>>,
32 links: Vec<Link>,
33 next_event_sequence: u64,
34 next_link_id: u64,
35}
36
37impl MemoryEngine {
38 pub fn new() -> Self {
40 Self::default()
41 }
42}
43
44impl ObjectStore for MemoryEngine {
45 fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
46 let now = now_iso_string();
47 let version = self
48 .objects
49 .get(&object.key)
50 .map_or(1, |existing| existing.version + 1);
51
52 object.version = version;
53 object.updated_at.clone_from(&now);
54 if object.created_at.is_empty() {
55 object.created_at = now;
56 }
57 self.objects.insert(object.key.clone(), object.clone());
58
59 Ok(object)
60 }
61
62 fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
63 Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
64 }
65
66 fn list_objects(
67 &self,
68 collections: Option<&[String]>,
69 options: &ListObjectsOptions,
70 ) -> ThingdResult<Vec<MemoryObject>> {
71 let mut objects: Vec<MemoryObject> =
72 self.objects
73 .values()
74 .filter(|object| {
75 collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
76 })
77 .filter(|object| {
78 if options.filter.is_empty() {
79 return true;
80 }
81 let Ok(body) = serde_json::from_str::<serde_json::Value>(&object.body) else {
82 return false;
83 };
84 options.filter.iter().all(|(key, expected)| {
85 body.get(key.as_str()).is_some_and(|v| v == expected)
86 })
87 })
88 .cloned()
89 .collect();
90
91 if let Some(offset) = options.offset {
92 let skip = usize::try_from(offset).unwrap_or(usize::MAX);
93 objects = objects.into_iter().skip(skip).collect();
94 }
95 if let Some(limit) = options.limit {
96 let take = usize::try_from(limit).unwrap_or(usize::MAX);
97 objects.truncate(take);
98 }
99
100 Ok(objects)
101 }
102
103 fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
104 Ok(self
105 .objects
106 .remove(&ObjectKey::new(collection, id))
107 .is_some())
108 }
109
110 fn count_objects(&self) -> ThingdResult<u64> {
111 Ok(self.objects.len() as u64)
112 }
113
114 fn list_collections(&self) -> ThingdResult<Vec<String>> {
115 let mut collections: Vec<String> = self
116 .objects
117 .keys()
118 .map(|key| key.collection.clone())
119 .collect();
120 collections.sort();
121 collections.dedup();
122 Ok(collections)
123 }
124}
125
126impl EventLog for MemoryEngine {
127 fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
128 self.next_event_sequence += 1;
129 event.sequence = self.next_event_sequence;
130 if event.created_at.is_empty() {
131 event.created_at = now_iso_string();
132 }
133 self.events.push(event.clone());
134
135 Ok(event)
136 }
137
138 fn list_events(
139 &self,
140 stream: Option<&str>,
141 options: ListEventsOptions,
142 ) -> ThingdResult<Vec<MemoryEvent>> {
143 let events = self
144 .events
145 .iter()
146 .filter(|event| stream.is_none_or(|target| event.stream == target))
147 .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
148 .cloned()
149 .collect::<Vec<_>>();
150
151 Ok(match options.limit {
152 Some(limit) => events
153 .into_iter()
154 .take(usize::try_from(limit).unwrap_or(usize::MAX))
155 .collect(),
156 None => events,
157 })
158 }
159
160 fn count_events(&self) -> ThingdResult<u64> {
161 Ok(self.events.len() as u64)
162 }
163
164 fn list_streams(&self) -> ThingdResult<Vec<String>> {
165 let mut streams: Vec<String> = self
166 .events
167 .iter()
168 .map(|event| event.stream.clone())
169 .collect();
170 streams.sort();
171 streams.dedup();
172 Ok(streams)
173 }
174}
175
176impl QueueStore for MemoryEngine {
177 fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
178 let jobs = self.queues.entry(job.queue.clone()).or_default();
179
180 if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
181 return Ok(existing.clone());
182 }
183
184 jobs.push_back(job.clone());
185 Ok(job)
186 }
187
188 fn claim_job_with_options(
189 &mut self,
190 queue: &str,
191 options: QueueClaimOptions,
192 ) -> ThingdResult<Option<QueueJob>> {
193 self.release_expired_leases(queue);
194
195 let Some(jobs) = self.queues.get_mut(queue) else {
196 return Ok(None);
197 };
198
199 let now = unix_timestamp_millis();
200 let Some(job) = jobs.iter_mut().find(|candidate| {
201 candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
202 }) else {
203 return Ok(None);
204 };
205
206 job.status = QueueJobStatus::Leased;
207 job.attempts += 1;
208 job.leased_at_ms = Some(now);
209 job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
210
211 Ok(Some(job.clone()))
212 }
213
214 fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
215 let Some(job) = self.find_job_mut(queue, id) else {
216 return Ok(None);
217 };
218
219 if job.status != QueueJobStatus::Leased {
220 return Err(ThingdError::Conflict(format!(
221 "job {id} must be leased before ack"
222 )));
223 }
224
225 job.status = QueueJobStatus::Completed;
226 job.completed_at_ms = Some(unix_timestamp_millis());
227
228 Ok(Some(job.clone()))
229 }
230
231 fn nack_job_with_options(
232 &mut self,
233 queue: &str,
234 id: &str,
235 options: QueueNackOptions,
236 ) -> ThingdResult<Option<QueueJob>> {
237 let Some(job) = self.find_job_mut(queue, id) else {
238 return Ok(None);
239 };
240
241 if job.status != QueueJobStatus::Leased {
242 return Err(ThingdError::Conflict(format!(
243 "job {id} must be leased before nack"
244 )));
245 }
246
247 let now = unix_timestamp_millis();
248 job.leased_at_ms = None;
249 job.lease_expires_at_ms = None;
250 if !options.error.is_empty() {
251 job.last_error = options.error;
252 }
253 job.status = if job.attempts >= job.max_attempts {
254 job.dead_at_ms = Some(now);
255 QueueJobStatus::Dead
256 } else {
257 job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
258 QueueJobStatus::Ready
259 };
260
261 Ok(Some(job.clone()))
262 }
263
264 fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
265 Ok(self
266 .queues
267 .get(queue)
268 .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
269 }
270
271 fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
272 Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
273 jobs.iter()
274 .filter(|job| job.status == QueueJobStatus::Dead)
275 .cloned()
276 .collect()
277 }))
278 }
279
280 fn list_queues(&self) -> ThingdResult<Vec<String>> {
281 let mut queues: Vec<String> = self.queues.keys().cloned().collect();
282 queues.sort();
283 Ok(queues)
284 }
285
286 fn count_active_jobs(&self) -> ThingdResult<u64> {
287 let count = self
288 .queues
289 .values()
290 .flat_map(|jobs| jobs.iter())
291 .filter(|job| job.status != QueueJobStatus::Dead)
292 .count();
293 Ok(count as u64)
294 }
295
296 fn count_dead_jobs(&self) -> ThingdResult<u64> {
297 let count = self
298 .queues
299 .values()
300 .flat_map(|jobs| jobs.iter())
301 .filter(|job| job.status == QueueJobStatus::Dead)
302 .count();
303 Ok(count as u64)
304 }
305}
306
307impl MemoryEngine {
308 fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
309 self.queues
310 .get_mut(queue)?
311 .iter_mut()
312 .find(|job| job.id == id)
313 }
314
315 fn release_expired_leases(&mut self, queue: &str) {
316 let now = unix_timestamp_millis();
317
318 for job in self.queues.get_mut(queue).into_iter().flatten() {
319 if job.status == QueueJobStatus::Leased
320 && job
321 .lease_expires_at_ms
322 .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
323 {
324 job.status = QueueJobStatus::Ready;
325 job.leased_at_ms = None;
326 job.lease_expires_at_ms = None;
327 }
328 }
329 }
330}
331
332impl crate::store::Searcher for MemoryEngine {
333 fn search(
334 &self,
335 query: &str,
336 options: crate::SearchOptions,
337 ) -> ThingdResult<Vec<crate::SearchHit>> {
338 let query_words: Vec<String> = query
339 .split_whitespace()
340 .map(|w| {
341 w.to_lowercase()
342 .chars()
343 .filter(|c| c.is_alphanumeric())
344 .collect()
345 })
346 .filter(|w: &String| !w.is_empty())
347 .collect();
348
349 if query_words.is_empty() {
350 return Ok(Vec::new());
351 }
352
353 let mut hits = Vec::new();
354
355 for object in self.objects.values() {
357 if let Some(ref collections) = options.collections
359 && !collections.contains(&object.key.collection)
360 {
361 continue;
362 }
363
364 if let Some(ref filter) = options.filter
366 && !matches_filter_memory(&object.body, filter)
367 {
368 continue;
369 }
370
371 let text_to_search = format!(
372 "{} {} {}",
373 object.key.collection, object.key.id, object.body
374 )
375 .to_lowercase();
376 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
377
378 if matches_all {
379 hits.push(crate::SearchHit {
380 kind: "object".to_string(),
381 collection: object.key.collection.clone(),
382 id: object.key.id.clone(),
383 text: object.body.clone(),
384 score: 1.0,
385 body: object.body.clone(),
386 version: Some(object.version),
387 created_at: object.created_at.clone(),
388 updated_at: Some(object.updated_at.clone()),
389 event_type: None,
390 });
391 }
392 }
393
394 for event in &self.events {
396 if let Some(ref collections) = options.collections
398 && !collections.contains(&event.stream)
399 {
400 continue;
401 }
402
403 if let Some(ref filter) = options.filter
405 && !matches_filter_memory(&event.body, filter)
406 {
407 continue;
408 }
409
410 let text_to_search =
411 format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
412 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
413
414 if matches_all {
415 hits.push(crate::SearchHit {
416 kind: "event".to_string(),
417 collection: event.stream.clone(),
418 id: event.sequence.to_string(),
419 text: event.body.clone(),
420 score: 1.0,
421 body: event.body.clone(),
422 version: None,
423 created_at: event.created_at.clone(),
424 updated_at: None,
425 event_type: Some(event.event_type.clone()),
426 });
427 }
428 }
429
430 if let Some(limit) = options.limit {
432 hits.truncate(limit);
433 }
434
435 Ok(hits)
436 }
437}
438
439impl LinkStore for MemoryEngine {
440 fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
441 self.next_link_id += 1;
442 link.id = format!("link-{}", self.next_link_id);
443 if link.created_at.is_empty() {
444 link.created_at = now_iso_string();
445 }
446 self.links.push(link.clone());
447 Ok(link)
448 }
449
450 fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
451 let len_before = self.links.len();
452 self.links.retain(|l| l.id != id);
453 Ok(self.links.len() < len_before)
454 }
455
456 fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
457 Ok(self.links.iter().find(|l| l.id == id).cloned())
458 }
459
460 fn get_neighbors(
461 &self,
462 reference: &str,
463 direction: LinkDirection,
464 options: LinkQueryOptions,
465 ) -> ThingdResult<Vec<Link>> {
466 let neighbors: Vec<Link> = self
467 .links
468 .iter()
469 .filter(|link| {
470 let matches_direction = match direction {
471 LinkDirection::Outgoing => link.from_ref == reference,
472 LinkDirection::Incoming => link.to_ref == reference,
473 LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
474 };
475 let matches_type = options
476 .link_type
477 .as_deref()
478 .is_none_or(|t| link.link_type == t);
479 matches_direction && matches_type
480 })
481 .cloned()
482 .collect();
483
484 Ok(match options.limit {
485 Some(limit) => neighbors.into_iter().take(limit).collect(),
486 None => neighbors,
487 })
488 }
489
490 fn count_links(&self) -> ThingdResult<u64> {
491 Ok(self.links.len() as u64)
492 }
493}
494
495fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
496 let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
497 return false;
498 };
499
500 let Some(filter_obj) = filter.as_object() else {
501 return true;
502 };
503
504 for (k, v) in filter_obj {
505 if body.get(k) != Some(v) {
506 return false;
507 }
508 }
509 true
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use crate::store::{LinkStore, Searcher};
516 use crate::{Link, ListObjectsOptions, SearchOptions};
517
518 #[test]
519 fn stores_and_reads_objects() {
520 let mut engine = MemoryEngine::new();
521
522 let object = engine
523 .put_object(MemoryObject::new(
524 "decisions",
525 "rust-core",
526 "{\"text\":\"Use Rust\"}",
527 ))
528 .unwrap();
529
530 let stored = engine
531 .get_object("decisions", "rust-core")
532 .unwrap()
533 .unwrap();
534 assert_eq!(object.version, 1);
535 assert_eq!(stored.key.collection, "decisions");
536 assert_eq!(stored.key.id, "rust-core");
537 }
538
539 #[test]
540 fn lists_objects_with_optional_collection_filter() {
541 let mut engine = MemoryEngine::new();
542
543 engine
544 .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
545 .unwrap();
546 engine
547 .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
548 .unwrap();
549
550 let filtered = engine
551 .list_objects(
552 Some(&["decisions".to_string()]),
553 &ListObjectsOptions::default(),
554 )
555 .unwrap();
556
557 assert_eq!(
558 engine
559 .list_objects(None, &ListObjectsOptions::default())
560 .unwrap()
561 .len(),
562 2
563 );
564 assert_eq!(filtered.len(), 1);
565 assert_eq!(filtered[0].key.collection, "decisions");
566 }
567
568 #[test]
569 fn appends_events_with_sequence_numbers() {
570 let mut engine = MemoryEngine::new();
571
572 let event = engine
573 .append_event(MemoryEvent::new(
574 "project:thingd",
575 "decision.made",
576 "MCP-native object storage",
577 ))
578 .unwrap();
579
580 assert_eq!(event.sequence, 1);
581 assert_eq!(
582 engine
583 .list_events(Some("project:thingd"), ListEventsOptions::default())
584 .unwrap()
585 .len(),
586 1
587 );
588 }
589
590 #[test]
591 fn claims_and_acks_queue_jobs() {
592 let mut engine = MemoryEngine::new();
593
594 engine
595 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
596 .unwrap();
597
598 let claimed = engine.claim_job("embed").unwrap().unwrap();
599 let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
600
601 assert_eq!(claimed.status, QueueJobStatus::Leased);
602 assert_eq!(claimed.attempts, 1);
603 assert_eq!(acked.status, QueueJobStatus::Completed);
604 }
605
606 #[test]
607 fn nacks_jobs_to_dead_letter_after_max_attempts() {
608 let mut engine = MemoryEngine::new();
609
610 engine
611 .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
612 .unwrap();
613
614 engine.claim_job("embed").unwrap().unwrap();
615 let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
616
617 assert_eq!(nacked.status, QueueJobStatus::Dead);
618 assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
619 }
620
621 #[test]
622 fn does_not_claim_delayed_jobs_before_available() {
623 let mut engine = MemoryEngine::new();
624
625 engine
626 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
627 .unwrap();
628
629 assert!(engine.claim_job("embed").unwrap().is_none());
630 }
631
632 #[test]
633 fn reclaims_jobs_after_lease_expiration() {
634 let mut engine = MemoryEngine::new();
635
636 engine
637 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
638 .unwrap();
639
640 let first = engine
641 .claim_job_with_options("embed", QueueClaimOptions::new(0))
642 .unwrap()
643 .unwrap();
644 let second = engine.claim_job("embed").unwrap().unwrap();
645
646 assert_eq!(first.status, QueueJobStatus::Leased);
647 assert_eq!(second.status, QueueJobStatus::Leased);
648 assert_eq!(second.attempts, 2);
649 }
650
651 #[test]
652 fn nacks_jobs_with_retry_delay() {
653 let mut engine = MemoryEngine::new();
654
655 engine
656 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
657 .unwrap();
658
659 engine.claim_job("embed").unwrap().unwrap();
660 let retried = engine
661 .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
662 .unwrap()
663 .unwrap();
664
665 assert_eq!(retried.status, QueueJobStatus::Ready);
666 assert!(engine.claim_job("embed").unwrap().is_none());
667 }
668
669 #[test]
670 fn counts_objects_events_and_jobs() {
671 let mut engine = MemoryEngine::new();
672
673 assert_eq!(engine.count_objects().unwrap(), 0);
674 assert_eq!(engine.count_events().unwrap(), 0);
675 assert_eq!(engine.count_active_jobs().unwrap(), 0);
676 assert_eq!(engine.count_dead_jobs().unwrap(), 0);
677
678 engine
679 .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
680 .unwrap();
681 engine
682 .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
683 .unwrap();
684 engine
685 .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
686 .unwrap();
687 assert_eq!(engine.count_objects().unwrap(), 3);
688
689 engine
690 .append_event(MemoryEvent::new("s1", "t1", "e1"))
691 .unwrap();
692 engine
693 .append_event(MemoryEvent::new("s1", "t2", "e2"))
694 .unwrap();
695 engine
696 .append_event(MemoryEvent::new("s2", "t3", "e3"))
697 .unwrap();
698 assert_eq!(engine.count_events().unwrap(), 3);
699
700 engine
701 .push_job(QueueJob::new("work", "j1", "p1", 3))
702 .unwrap();
703 engine
704 .push_job(QueueJob::new("work", "j2", "p2", 3))
705 .unwrap();
706 engine
707 .push_job(QueueJob::new("other", "j3", "p3", 1))
708 .unwrap();
709 assert_eq!(engine.count_active_jobs().unwrap(), 3);
710
711 engine.claim_job("other").unwrap();
712 engine.nack_job("other", "j3").unwrap();
713 assert_eq!(engine.count_dead_jobs().unwrap(), 1);
714 assert_eq!(engine.count_active_jobs().unwrap(), 2);
715 }
716
717 #[test]
718 fn lists_collections_streams_and_queues() {
719 let mut engine = MemoryEngine::new();
720
721 assert!(engine.list_collections().unwrap().is_empty());
722 assert!(engine.list_streams().unwrap().is_empty());
723 assert!(engine.list_queues().unwrap().is_empty());
724
725 engine
726 .put_object(MemoryObject::new("col-a", "x", "{}"))
727 .unwrap();
728 engine
729 .put_object(MemoryObject::new("col-b", "y", "{}"))
730 .unwrap();
731 engine
732 .put_object(MemoryObject::new("col-a", "z", "{}"))
733 .unwrap();
734 let collections = engine.list_collections().unwrap();
735 assert_eq!(collections, vec!["col-a", "col-b"]);
736
737 engine
738 .append_event(MemoryEvent::new("s1", "t", "e1"))
739 .unwrap();
740 engine
741 .append_event(MemoryEvent::new("s2", "t", "e2"))
742 .unwrap();
743 let streams = engine.list_streams().unwrap();
744 assert_eq!(streams, vec!["s1", "s2"]);
745
746 engine
747 .push_job(QueueJob::new("work", "j1", "p1", 3))
748 .unwrap();
749 engine
750 .push_job(QueueJob::new("jobs", "j2", "p2", 3))
751 .unwrap();
752 let queues = engine.list_queues().unwrap();
753 assert_eq!(queues, vec!["jobs", "work"]);
754 }
755
756 #[test]
757 fn search_respects_filter_and_limit() {
758 let mut engine = MemoryEngine::new();
759
760 engine
761 .put_object(MemoryObject::new(
762 "docs",
763 "a",
764 r#"{"text":"hello world","tag":"greeting"}"#,
765 ))
766 .unwrap();
767 engine
768 .put_object(MemoryObject::new(
769 "docs",
770 "b",
771 r#"{"text":"hello there","tag":"greeting"}"#,
772 ))
773 .unwrap();
774 engine
775 .put_object(MemoryObject::new(
776 "docs",
777 "c",
778 r#"{"text":"goodbye world","tag":"farewell"}"#,
779 ))
780 .unwrap();
781
782 let all = engine.search("world", SearchOptions::default()).unwrap();
783 assert_eq!(all.len(), 2);
784
785 let limited = engine
786 .search(
787 "world",
788 SearchOptions {
789 limit: Some(1),
790 ..Default::default()
791 },
792 )
793 .unwrap();
794 assert_eq!(limited.len(), 1);
795
796 let filtered = engine
797 .search(
798 "hello",
799 SearchOptions {
800 collections: Some(vec!["docs".into()]),
801 ..Default::default()
802 },
803 )
804 .unwrap();
805 assert_eq!(filtered.len(), 2);
806 }
807
808 #[test]
811 fn list_objects_filter_returns_matching_objects() {
812 let mut engine = MemoryEngine::new();
813 engine
814 .put_object(MemoryObject::new("w", "a", r#"{"color":"red","size":1}"#))
815 .unwrap();
816 engine
817 .put_object(MemoryObject::new("w", "b", r#"{"color":"blue","size":2}"#))
818 .unwrap();
819 engine
820 .put_object(MemoryObject::new("w", "c", r#"{"color":"red","size":3}"#))
821 .unwrap();
822
823 let opts = ListObjectsOptions {
824 filter: vec![("color".into(), serde_json::json!("red"))],
825 ..Default::default()
826 };
827 let results = engine
828 .list_objects(Some(&["w".to_string()]), &opts)
829 .unwrap();
830 assert_eq!(results.len(), 2);
831 assert!(results.iter().all(|o| o.body.contains("\"red\"")));
832 }
833
834 #[test]
835 fn list_objects_filter_no_match_returns_empty() {
836 let mut engine = MemoryEngine::new();
837 engine
838 .put_object(MemoryObject::new("w", "a", r#"{"color":"red"}"#))
839 .unwrap();
840
841 let opts = ListObjectsOptions {
842 filter: vec![("color".into(), serde_json::json!("green"))],
843 ..Default::default()
844 };
845 let results = engine
846 .list_objects(Some(&["w".to_string()]), &opts)
847 .unwrap();
848 assert!(results.is_empty());
849 }
850
851 #[test]
852 fn list_objects_limit_truncates_results() {
853 let mut engine = MemoryEngine::new();
854 for i in 0..5u32 {
855 engine
856 .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
857 .unwrap();
858 }
859
860 let opts = ListObjectsOptions {
861 limit: Some(3),
862 ..Default::default()
863 };
864 let results = engine
865 .list_objects(Some(&["col".to_string()]), &opts)
866 .unwrap();
867 assert_eq!(results.len(), 3);
868 }
869
870 #[test]
871 fn list_objects_offset_skips_results() {
872 let mut engine = MemoryEngine::new();
873 for i in 0..5u32 {
874 engine
875 .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
876 .unwrap();
877 }
878
879 let opts = ListObjectsOptions {
880 offset: Some(3),
881 ..Default::default()
882 };
883 let results = engine
884 .list_objects(Some(&["col".to_string()]), &opts)
885 .unwrap();
886 assert_eq!(results.len(), 2);
887 }
888
889 #[test]
890 fn list_objects_filter_and_limit_combined() {
891 let mut engine = MemoryEngine::new();
892 for i in 0..4u32 {
893 engine
894 .put_object(MemoryObject::new(
895 "col",
896 format!("id-{i}"),
897 r#"{"status":"active"}"#,
898 ))
899 .unwrap();
900 }
901 engine
902 .put_object(MemoryObject::new("col", "id-4", r#"{"status":"inactive"}"#))
903 .unwrap();
904
905 let opts = ListObjectsOptions {
906 filter: vec![("status".into(), serde_json::json!("active"))],
907 limit: Some(2),
908 ..Default::default()
909 };
910 let results = engine
911 .list_objects(Some(&["col".to_string()]), &opts)
912 .unwrap();
913 assert_eq!(results.len(), 2);
914 assert!(results.iter().all(|o| o.body.contains("active")));
915 }
916
917 #[test]
920 fn append_event_returns_sequence_and_timestamp() {
921 let mut engine = MemoryEngine::new();
922
923 let first = engine
924 .append_event(MemoryEvent::new("s", "ev.first", r#"{"x":1}"#))
925 .unwrap();
926 let second = engine
927 .append_event(MemoryEvent::new("s", "ev.second", r#"{"x":2}"#))
928 .unwrap();
929
930 assert_eq!(first.sequence, 1);
931 assert_eq!(second.sequence, 2);
932 assert!(!first.created_at.is_empty(), "created_at should be set");
933 }
934
935 #[test]
938 fn create_link_ids_are_unique_after_delete() {
939 let mut engine = MemoryEngine::new();
940 engine
941 .put_object(MemoryObject::new("n", "a", "{}"))
942 .unwrap();
943 engine
944 .put_object(MemoryObject::new("n", "b", "{}"))
945 .unwrap();
946 engine
947 .put_object(MemoryObject::new("n", "c", "{}"))
948 .unwrap();
949
950 let l1 = engine
951 .create_link(Link::new("n/a", "connects", "n/b"))
952 .unwrap();
953 let l2 = engine
954 .create_link(Link::new("n/b", "connects", "n/c"))
955 .unwrap();
956
957 engine.delete_link(&l1.id).unwrap();
959
960 let l3 = engine
961 .create_link(Link::new("n/a", "connects", "n/c"))
962 .unwrap();
963
964 assert_ne!(l3.id, l2.id, "IDs must not collide after a delete");
965 assert!(l3.id > l2.id, "IDs must be monotonically increasing");
966 }
967}