1use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::{ListEventsOptions, ListObjectsOptions};
6use crate::{
7 EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8 ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9 QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12#[derive(Default)]
28pub struct MemoryEngine {
29 objects: BTreeMap<ObjectKey, MemoryObject>,
30 events: Vec<MemoryEvent>,
31 queues: BTreeMap<String, VecDeque<QueueJob>>,
32 links: Vec<Link>,
33 next_event_sequence: u64,
34 next_link_id: u64,
35}
36
37impl MemoryEngine {
38 pub fn new() -> Self {
40 Self::default()
41 }
42}
43
44impl ObjectStore for MemoryEngine {
45 fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
46 let now = now_iso_string();
47 let version = self
48 .objects
49 .get(&object.key)
50 .map_or(1, |existing| existing.version + 1);
51
52 object.version = version;
53 object.updated_at.clone_from(&now);
54 if object.created_at.is_empty() {
55 object.created_at = now;
56 }
57 self.objects.insert(object.key.clone(), object.clone());
58
59 Ok(object)
60 }
61
62 fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
63 Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
64 }
65
66 fn list_objects(
67 &self,
68 collections: Option<&[String]>,
69 options: &ListObjectsOptions,
70 ) -> ThingdResult<Vec<MemoryObject>> {
71 let mut objects: Vec<MemoryObject> =
72 self.objects
73 .values()
74 .filter(|object| {
75 collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
76 })
77 .filter(|object| {
78 if options.filter.is_empty() {
79 return true;
80 }
81 let Ok(body) = serde_json::from_str::<serde_json::Value>(&object.body) else {
82 return false;
83 };
84 options.filter.iter().all(|(key, expected)| {
85 body.get(key.as_str()).is_some_and(|v| v == expected)
86 })
87 })
88 .cloned()
89 .collect();
90
91 if let Some(ref sort_by) = options.sort_by {
93 use crate::model::SortDirection;
94 let asc = sort_by.direction == SortDirection::Asc;
95 objects.sort_by(|a, b| {
96 let cmp = match sort_by.field.as_str() {
97 "id" => a.key.id.cmp(&b.key.id),
98 "collection" => a.key.collection.cmp(&b.key.collection),
99 "created_at" => a.created_at.cmp(&b.created_at),
100 "updated_at" => a.updated_at.cmp(&b.updated_at),
101 "version" => a.version.cmp(&b.version),
102 _ => std::cmp::Ordering::Equal,
103 };
104 if asc { cmp } else { cmp.reverse() }
105 });
106 }
107
108 if let Some(offset) = options.offset {
109 let skip = usize::try_from(offset).unwrap_or(usize::MAX);
110 objects = objects.into_iter().skip(skip).collect();
111 }
112 if let Some(limit) = options.limit {
113 let take = usize::try_from(limit).unwrap_or(usize::MAX);
114 objects.truncate(take);
115 }
116
117 Ok(objects)
118 }
119
120 fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
121 Ok(self
122 .objects
123 .remove(&ObjectKey::new(collection, id))
124 .is_some())
125 }
126
127 fn count_objects(&self) -> ThingdResult<u64> {
128 Ok(self.objects.len() as u64)
129 }
130
131 fn list_collections(&self) -> ThingdResult<Vec<String>> {
132 let mut collections: Vec<String> = self
133 .objects
134 .keys()
135 .map(|key| key.collection.clone())
136 .collect();
137 collections.sort();
138 collections.dedup();
139 Ok(collections)
140 }
141}
142
143impl EventLog for MemoryEngine {
144 fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
145 self.next_event_sequence += 1;
146 event.sequence = self.next_event_sequence;
147 if event.created_at.is_empty() {
148 event.created_at = now_iso_string();
149 }
150 self.events.push(event.clone());
151
152 Ok(event)
153 }
154
155 fn list_events(
156 &self,
157 stream: Option<&str>,
158 options: ListEventsOptions,
159 ) -> ThingdResult<Vec<MemoryEvent>> {
160 let events = self
161 .events
162 .iter()
163 .filter(|event| stream.is_none_or(|target| event.stream == target))
164 .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
165 .cloned()
166 .collect::<Vec<_>>();
167
168 Ok(match options.limit {
169 Some(limit) => events
170 .into_iter()
171 .take(usize::try_from(limit).unwrap_or(usize::MAX))
172 .collect(),
173 None => events,
174 })
175 }
176
177 fn count_events(&self) -> ThingdResult<u64> {
178 Ok(self.events.len() as u64)
179 }
180
181 fn list_streams(&self) -> ThingdResult<Vec<String>> {
182 let mut streams: Vec<String> = self
183 .events
184 .iter()
185 .map(|event| event.stream.clone())
186 .collect();
187 streams.sort();
188 streams.dedup();
189 Ok(streams)
190 }
191}
192
193impl QueueStore for MemoryEngine {
194 fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
195 let jobs = self.queues.entry(job.queue.clone()).or_default();
196
197 if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
198 return Ok(existing.clone());
199 }
200
201 jobs.push_back(job.clone());
202 Ok(job)
203 }
204
205 fn claim_job_with_options(
206 &mut self,
207 queue: &str,
208 options: QueueClaimOptions,
209 ) -> ThingdResult<Option<QueueJob>> {
210 self.release_expired_leases(queue);
211
212 let Some(jobs) = self.queues.get_mut(queue) else {
213 return Ok(None);
214 };
215
216 let now = unix_timestamp_millis();
217 let Some(job) = jobs.iter_mut().find(|candidate| {
218 candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
219 }) else {
220 return Ok(None);
221 };
222
223 job.status = QueueJobStatus::Leased;
224 job.attempts += 1;
225 job.leased_at_ms = Some(now);
226 job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
227
228 Ok(Some(job.clone()))
229 }
230
231 fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
232 let Some(job) = self.find_job_mut(queue, id) else {
233 return Ok(None);
234 };
235
236 if job.status != QueueJobStatus::Leased {
237 return Err(ThingdError::Conflict(format!(
238 "job {id} must be leased before ack"
239 )));
240 }
241
242 job.status = QueueJobStatus::Completed;
243 job.completed_at_ms = Some(unix_timestamp_millis());
244
245 Ok(Some(job.clone()))
246 }
247
248 fn nack_job_with_options(
249 &mut self,
250 queue: &str,
251 id: &str,
252 options: QueueNackOptions,
253 ) -> ThingdResult<Option<QueueJob>> {
254 let Some(job) = self.find_job_mut(queue, id) else {
255 return Ok(None);
256 };
257
258 if job.status != QueueJobStatus::Leased {
259 return Err(ThingdError::Conflict(format!(
260 "job {id} must be leased before nack"
261 )));
262 }
263
264 let now = unix_timestamp_millis();
265 job.leased_at_ms = None;
266 job.lease_expires_at_ms = None;
267 if !options.error.is_empty() {
268 job.last_error = options.error;
269 }
270 job.status = if job.attempts >= job.max_attempts {
271 job.dead_at_ms = Some(now);
272 QueueJobStatus::Dead
273 } else {
274 job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
275 QueueJobStatus::Ready
276 };
277
278 Ok(Some(job.clone()))
279 }
280
281 fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
282 Ok(self
283 .queues
284 .get(queue)
285 .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
286 }
287
288 fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
289 Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
290 jobs.iter()
291 .filter(|job| job.status == QueueJobStatus::Dead)
292 .cloned()
293 .collect()
294 }))
295 }
296
297 fn list_queues(&self) -> ThingdResult<Vec<String>> {
298 let mut queues: Vec<String> = self.queues.keys().cloned().collect();
299 queues.sort();
300 Ok(queues)
301 }
302
303 fn count_active_jobs(&self) -> ThingdResult<u64> {
304 let count = self
305 .queues
306 .values()
307 .flat_map(|jobs| jobs.iter())
308 .filter(|job| job.status != QueueJobStatus::Dead)
309 .count();
310 Ok(count as u64)
311 }
312
313 fn count_dead_jobs(&self) -> ThingdResult<u64> {
314 let count = self
315 .queues
316 .values()
317 .flat_map(|jobs| jobs.iter())
318 .filter(|job| job.status == QueueJobStatus::Dead)
319 .count();
320 Ok(count as u64)
321 }
322}
323
324impl MemoryEngine {
325 fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
326 self.queues
327 .get_mut(queue)?
328 .iter_mut()
329 .find(|job| job.id == id)
330 }
331
332 fn release_expired_leases(&mut self, queue: &str) {
333 let now = unix_timestamp_millis();
334
335 for job in self.queues.get_mut(queue).into_iter().flatten() {
336 if job.status == QueueJobStatus::Leased
337 && job
338 .lease_expires_at_ms
339 .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
340 {
341 job.status = QueueJobStatus::Ready;
342 job.leased_at_ms = None;
343 job.lease_expires_at_ms = None;
344 }
345 }
346 }
347}
348
349impl crate::store::Searcher for MemoryEngine {
350 fn search(
351 &self,
352 query: &str,
353 options: crate::SearchOptions,
354 ) -> ThingdResult<Vec<crate::SearchHit>> {
355 let query_words: Vec<String> = query
356 .split_whitespace()
357 .map(|w| {
358 w.to_lowercase()
359 .chars()
360 .filter(|c| c.is_alphanumeric())
361 .collect()
362 })
363 .filter(|w: &String| !w.is_empty())
364 .collect();
365
366 if query_words.is_empty() {
367 return Ok(Vec::new());
368 }
369
370 let mut hits = Vec::new();
371
372 for object in self.objects.values() {
374 if let Some(ref collections) = options.collections
376 && !collections.contains(&object.key.collection)
377 {
378 continue;
379 }
380
381 if let Some(ref filter) = options.filter
383 && !matches_filter_memory(&object.body, filter)
384 {
385 continue;
386 }
387
388 let text_to_search = format!(
389 "{} {} {}",
390 object.key.collection, object.key.id, object.body
391 )
392 .to_lowercase();
393 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
394
395 if matches_all {
396 hits.push(crate::SearchHit {
397 kind: "object".to_string(),
398 collection: object.key.collection.clone(),
399 id: object.key.id.clone(),
400 text: object.body.clone(),
401 score: 1.0,
402 body: object.body.clone(),
403 version: Some(object.version),
404 created_at: object.created_at.clone(),
405 updated_at: Some(object.updated_at.clone()),
406 event_type: None,
407 });
408 }
409 }
410
411 for event in &self.events {
413 if let Some(ref collections) = options.collections
415 && !collections.contains(&event.stream)
416 {
417 continue;
418 }
419
420 if let Some(ref filter) = options.filter
422 && !matches_filter_memory(&event.body, filter)
423 {
424 continue;
425 }
426
427 let text_to_search =
428 format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
429 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
430
431 if matches_all {
432 hits.push(crate::SearchHit {
433 kind: "event".to_string(),
434 collection: event.stream.clone(),
435 id: event.sequence.to_string(),
436 text: event.body.clone(),
437 score: 1.0,
438 body: event.body.clone(),
439 version: None,
440 created_at: event.created_at.clone(),
441 updated_at: None,
442 event_type: Some(event.event_type.clone()),
443 });
444 }
445 }
446
447 if let Some(limit) = options.limit {
449 hits.truncate(limit);
450 }
451
452 Ok(hits)
453 }
454}
455
456impl LinkStore for MemoryEngine {
457 fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
458 self.next_link_id += 1;
459 link.id = format!("link-{}", self.next_link_id);
460 if link.created_at.is_empty() {
461 link.created_at = now_iso_string();
462 }
463 self.links.push(link.clone());
464 Ok(link)
465 }
466
467 fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
468 let len_before = self.links.len();
469 self.links.retain(|l| l.id != id);
470 Ok(self.links.len() < len_before)
471 }
472
473 fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
474 Ok(self.links.iter().find(|l| l.id == id).cloned())
475 }
476
477 fn get_neighbors(
478 &self,
479 reference: &str,
480 direction: LinkDirection,
481 options: LinkQueryOptions,
482 ) -> ThingdResult<Vec<Link>> {
483 let neighbors: Vec<Link> = self
484 .links
485 .iter()
486 .filter(|link| {
487 let matches_direction = match direction {
488 LinkDirection::Outgoing => link.from_ref == reference,
489 LinkDirection::Incoming => link.to_ref == reference,
490 LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
491 };
492 let matches_type = options
493 .link_type
494 .as_deref()
495 .is_none_or(|t| link.link_type == t);
496 matches_direction && matches_type
497 })
498 .cloned()
499 .collect();
500
501 Ok(match options.limit {
502 Some(limit) => neighbors.into_iter().take(limit).collect(),
503 None => neighbors,
504 })
505 }
506
507 fn count_links(&self) -> ThingdResult<u64> {
508 Ok(self.links.len() as u64)
509 }
510}
511
512fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
513 let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
514 return false;
515 };
516
517 let Some(filter_obj) = filter.as_object() else {
518 return true;
519 };
520
521 for (k, v) in filter_obj {
522 if body.get(k) != Some(v) {
523 return false;
524 }
525 }
526 true
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use crate::store::{LinkStore, Searcher};
533 use crate::{Link, ListObjectsOptions, SearchOptions};
534
535 #[test]
536 fn stores_and_reads_objects() {
537 let mut engine = MemoryEngine::new();
538
539 let object = engine
540 .put_object(MemoryObject::new(
541 "decisions",
542 "rust-core",
543 "{\"text\":\"Use Rust\"}",
544 ))
545 .unwrap();
546
547 let stored = engine
548 .get_object("decisions", "rust-core")
549 .unwrap()
550 .unwrap();
551 assert_eq!(object.version, 1);
552 assert_eq!(stored.key.collection, "decisions");
553 assert_eq!(stored.key.id, "rust-core");
554 }
555
556 #[test]
557 fn lists_objects_with_optional_collection_filter() {
558 let mut engine = MemoryEngine::new();
559
560 engine
561 .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
562 .unwrap();
563 engine
564 .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
565 .unwrap();
566
567 let filtered = engine
568 .list_objects(
569 Some(&["decisions".to_string()]),
570 &ListObjectsOptions::default(),
571 )
572 .unwrap();
573
574 assert_eq!(
575 engine
576 .list_objects(None, &ListObjectsOptions::default())
577 .unwrap()
578 .len(),
579 2
580 );
581 assert_eq!(filtered.len(), 1);
582 assert_eq!(filtered[0].key.collection, "decisions");
583 }
584
585 #[test]
586 fn appends_events_with_sequence_numbers() {
587 let mut engine = MemoryEngine::new();
588
589 let event = engine
590 .append_event(MemoryEvent::new(
591 "project:thingd",
592 "decision.made",
593 "MCP-native object storage",
594 ))
595 .unwrap();
596
597 assert_eq!(event.sequence, 1);
598 assert_eq!(
599 engine
600 .list_events(Some("project:thingd"), ListEventsOptions::default())
601 .unwrap()
602 .len(),
603 1
604 );
605 }
606
607 #[test]
608 fn claims_and_acks_queue_jobs() {
609 let mut engine = MemoryEngine::new();
610
611 engine
612 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
613 .unwrap();
614
615 let claimed = engine.claim_job("embed").unwrap().unwrap();
616 let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
617
618 assert_eq!(claimed.status, QueueJobStatus::Leased);
619 assert_eq!(claimed.attempts, 1);
620 assert_eq!(acked.status, QueueJobStatus::Completed);
621 }
622
623 #[test]
624 fn nacks_jobs_to_dead_letter_after_max_attempts() {
625 let mut engine = MemoryEngine::new();
626
627 engine
628 .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
629 .unwrap();
630
631 engine.claim_job("embed").unwrap().unwrap();
632 let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
633
634 assert_eq!(nacked.status, QueueJobStatus::Dead);
635 assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
636 }
637
638 #[test]
639 fn does_not_claim_delayed_jobs_before_available() {
640 let mut engine = MemoryEngine::new();
641
642 engine
643 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
644 .unwrap();
645
646 assert!(engine.claim_job("embed").unwrap().is_none());
647 }
648
649 #[test]
650 fn reclaims_jobs_after_lease_expiration() {
651 let mut engine = MemoryEngine::new();
652
653 engine
654 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
655 .unwrap();
656
657 let first = engine
658 .claim_job_with_options("embed", QueueClaimOptions::new(0))
659 .unwrap()
660 .unwrap();
661 let second = engine.claim_job("embed").unwrap().unwrap();
662
663 assert_eq!(first.status, QueueJobStatus::Leased);
664 assert_eq!(second.status, QueueJobStatus::Leased);
665 assert_eq!(second.attempts, 2);
666 }
667
668 #[test]
669 fn nacks_jobs_with_retry_delay() {
670 let mut engine = MemoryEngine::new();
671
672 engine
673 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
674 .unwrap();
675
676 engine.claim_job("embed").unwrap().unwrap();
677 let retried = engine
678 .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
679 .unwrap()
680 .unwrap();
681
682 assert_eq!(retried.status, QueueJobStatus::Ready);
683 assert!(engine.claim_job("embed").unwrap().is_none());
684 }
685
686 #[test]
687 fn counts_objects_events_and_jobs() {
688 let mut engine = MemoryEngine::new();
689
690 assert_eq!(engine.count_objects().unwrap(), 0);
691 assert_eq!(engine.count_events().unwrap(), 0);
692 assert_eq!(engine.count_active_jobs().unwrap(), 0);
693 assert_eq!(engine.count_dead_jobs().unwrap(), 0);
694
695 engine
696 .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
697 .unwrap();
698 engine
699 .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
700 .unwrap();
701 engine
702 .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
703 .unwrap();
704 assert_eq!(engine.count_objects().unwrap(), 3);
705
706 engine
707 .append_event(MemoryEvent::new("s1", "t1", "e1"))
708 .unwrap();
709 engine
710 .append_event(MemoryEvent::new("s1", "t2", "e2"))
711 .unwrap();
712 engine
713 .append_event(MemoryEvent::new("s2", "t3", "e3"))
714 .unwrap();
715 assert_eq!(engine.count_events().unwrap(), 3);
716
717 engine
718 .push_job(QueueJob::new("work", "j1", "p1", 3))
719 .unwrap();
720 engine
721 .push_job(QueueJob::new("work", "j2", "p2", 3))
722 .unwrap();
723 engine
724 .push_job(QueueJob::new("other", "j3", "p3", 1))
725 .unwrap();
726 assert_eq!(engine.count_active_jobs().unwrap(), 3);
727
728 engine.claim_job("other").unwrap();
729 engine.nack_job("other", "j3").unwrap();
730 assert_eq!(engine.count_dead_jobs().unwrap(), 1);
731 assert_eq!(engine.count_active_jobs().unwrap(), 2);
732 }
733
734 #[test]
735 fn lists_collections_streams_and_queues() {
736 let mut engine = MemoryEngine::new();
737
738 assert!(engine.list_collections().unwrap().is_empty());
739 assert!(engine.list_streams().unwrap().is_empty());
740 assert!(engine.list_queues().unwrap().is_empty());
741
742 engine
743 .put_object(MemoryObject::new("col-a", "x", "{}"))
744 .unwrap();
745 engine
746 .put_object(MemoryObject::new("col-b", "y", "{}"))
747 .unwrap();
748 engine
749 .put_object(MemoryObject::new("col-a", "z", "{}"))
750 .unwrap();
751 let collections = engine.list_collections().unwrap();
752 assert_eq!(collections, vec!["col-a", "col-b"]);
753
754 engine
755 .append_event(MemoryEvent::new("s1", "t", "e1"))
756 .unwrap();
757 engine
758 .append_event(MemoryEvent::new("s2", "t", "e2"))
759 .unwrap();
760 let streams = engine.list_streams().unwrap();
761 assert_eq!(streams, vec!["s1", "s2"]);
762
763 engine
764 .push_job(QueueJob::new("work", "j1", "p1", 3))
765 .unwrap();
766 engine
767 .push_job(QueueJob::new("jobs", "j2", "p2", 3))
768 .unwrap();
769 let queues = engine.list_queues().unwrap();
770 assert_eq!(queues, vec!["jobs", "work"]);
771 }
772
773 #[test]
774 fn search_respects_filter_and_limit() {
775 let mut engine = MemoryEngine::new();
776
777 engine
778 .put_object(MemoryObject::new(
779 "docs",
780 "a",
781 r#"{"text":"hello world","tag":"greeting"}"#,
782 ))
783 .unwrap();
784 engine
785 .put_object(MemoryObject::new(
786 "docs",
787 "b",
788 r#"{"text":"hello there","tag":"greeting"}"#,
789 ))
790 .unwrap();
791 engine
792 .put_object(MemoryObject::new(
793 "docs",
794 "c",
795 r#"{"text":"goodbye world","tag":"farewell"}"#,
796 ))
797 .unwrap();
798
799 let all = engine.search("world", SearchOptions::default()).unwrap();
800 assert_eq!(all.len(), 2);
801
802 let limited = engine
803 .search(
804 "world",
805 SearchOptions {
806 limit: Some(1),
807 ..Default::default()
808 },
809 )
810 .unwrap();
811 assert_eq!(limited.len(), 1);
812
813 let filtered = engine
814 .search(
815 "hello",
816 SearchOptions {
817 collections: Some(vec!["docs".into()]),
818 ..Default::default()
819 },
820 )
821 .unwrap();
822 assert_eq!(filtered.len(), 2);
823 }
824
825 #[test]
828 fn list_objects_filter_returns_matching_objects() {
829 let mut engine = MemoryEngine::new();
830 engine
831 .put_object(MemoryObject::new("w", "a", r#"{"color":"red","size":1}"#))
832 .unwrap();
833 engine
834 .put_object(MemoryObject::new("w", "b", r#"{"color":"blue","size":2}"#))
835 .unwrap();
836 engine
837 .put_object(MemoryObject::new("w", "c", r#"{"color":"red","size":3}"#))
838 .unwrap();
839
840 let opts = ListObjectsOptions {
841 filter: vec![("color".into(), serde_json::json!("red"))],
842 ..Default::default()
843 };
844 let results = engine
845 .list_objects(Some(&["w".to_string()]), &opts)
846 .unwrap();
847 assert_eq!(results.len(), 2);
848 assert!(results.iter().all(|o| o.body.contains("\"red\"")));
849 }
850
851 #[test]
852 fn list_objects_filter_no_match_returns_empty() {
853 let mut engine = MemoryEngine::new();
854 engine
855 .put_object(MemoryObject::new("w", "a", r#"{"color":"red"}"#))
856 .unwrap();
857
858 let opts = ListObjectsOptions {
859 filter: vec![("color".into(), serde_json::json!("green"))],
860 ..Default::default()
861 };
862 let results = engine
863 .list_objects(Some(&["w".to_string()]), &opts)
864 .unwrap();
865 assert!(results.is_empty());
866 }
867
868 #[test]
869 fn list_objects_limit_truncates_results() {
870 let mut engine = MemoryEngine::new();
871 for i in 0..5u32 {
872 engine
873 .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
874 .unwrap();
875 }
876
877 let opts = ListObjectsOptions {
878 limit: Some(3),
879 ..Default::default()
880 };
881 let results = engine
882 .list_objects(Some(&["col".to_string()]), &opts)
883 .unwrap();
884 assert_eq!(results.len(), 3);
885 }
886
887 #[test]
888 fn list_objects_offset_skips_results() {
889 let mut engine = MemoryEngine::new();
890 for i in 0..5u32 {
891 engine
892 .put_object(MemoryObject::new("col", format!("id-{i}"), "{}"))
893 .unwrap();
894 }
895
896 let opts = ListObjectsOptions {
897 offset: Some(3),
898 ..Default::default()
899 };
900 let results = engine
901 .list_objects(Some(&["col".to_string()]), &opts)
902 .unwrap();
903 assert_eq!(results.len(), 2);
904 }
905
906 #[test]
907 fn list_objects_filter_and_limit_combined() {
908 let mut engine = MemoryEngine::new();
909 for i in 0..4u32 {
910 engine
911 .put_object(MemoryObject::new(
912 "col",
913 format!("id-{i}"),
914 r#"{"status":"active"}"#,
915 ))
916 .unwrap();
917 }
918 engine
919 .put_object(MemoryObject::new("col", "id-4", r#"{"status":"inactive"}"#))
920 .unwrap();
921
922 let opts = ListObjectsOptions {
923 filter: vec![("status".into(), serde_json::json!("active"))],
924 limit: Some(2),
925 ..Default::default()
926 };
927 let results = engine
928 .list_objects(Some(&["col".to_string()]), &opts)
929 .unwrap();
930 assert_eq!(results.len(), 2);
931 assert!(results.iter().all(|o| o.body.contains("active")));
932 }
933
934 #[test]
937 fn append_event_returns_sequence_and_timestamp() {
938 let mut engine = MemoryEngine::new();
939
940 let first = engine
941 .append_event(MemoryEvent::new("s", "ev.first", r#"{"x":1}"#))
942 .unwrap();
943 let second = engine
944 .append_event(MemoryEvent::new("s", "ev.second", r#"{"x":2}"#))
945 .unwrap();
946
947 assert_eq!(first.sequence, 1);
948 assert_eq!(second.sequence, 2);
949 assert!(!first.created_at.is_empty(), "created_at should be set");
950 }
951
952 #[test]
955 fn create_link_ids_are_unique_after_delete() {
956 let mut engine = MemoryEngine::new();
957 engine
958 .put_object(MemoryObject::new("n", "a", "{}"))
959 .unwrap();
960 engine
961 .put_object(MemoryObject::new("n", "b", "{}"))
962 .unwrap();
963 engine
964 .put_object(MemoryObject::new("n", "c", "{}"))
965 .unwrap();
966
967 let l1 = engine
968 .create_link(Link::new("n/a", "connects", "n/b"))
969 .unwrap();
970 let l2 = engine
971 .create_link(Link::new("n/b", "connects", "n/c"))
972 .unwrap();
973
974 engine.delete_link(&l1.id).unwrap();
976
977 let l3 = engine
978 .create_link(Link::new("n/a", "connects", "n/c"))
979 .unwrap();
980
981 assert_ne!(l3.id, l2.id, "IDs must not collide after a delete");
982 assert!(l3.id > l2.id, "IDs must be monotonically increasing");
983 }
984
985 #[test]
988 fn list_objects_sort_by_created_at_desc() {
989 let mut engine = MemoryEngine::new();
990 engine
991 .put_object(MemoryObject::new("w", "a", r#"{"x":1}"#))
992 .unwrap();
993 engine
994 .put_object(MemoryObject::new("w", "b", r#"{"x":2}"#))
995 .unwrap();
996 engine
997 .put_object(MemoryObject::new("w", "c", r#"{"x":3}"#))
998 .unwrap();
999
1000 let opts = ListObjectsOptions {
1001 sort_by: Some(crate::SortBy::desc("created_at")),
1002 ..Default::default()
1003 };
1004 let results = engine
1005 .list_objects(Some(&["w".to_string()]), &opts)
1006 .unwrap();
1007 assert_eq!(results.len(), 3);
1008 }
1009
1010 #[test]
1013 fn list_objects_sort_by_id_asc() {
1014 let mut engine = MemoryEngine::new();
1015 engine
1016 .put_object(MemoryObject::new("w", "c", r#"{"x":3}"#))
1017 .unwrap();
1018 engine
1019 .put_object(MemoryObject::new("w", "a", r#"{"x":1}"#))
1020 .unwrap();
1021 engine
1022 .put_object(MemoryObject::new("w", "b", r#"{"x":2}"#))
1023 .unwrap();
1024
1025 let opts = ListObjectsOptions {
1026 sort_by: Some(crate::SortBy::asc("id")),
1027 ..Default::default()
1028 };
1029 let results = engine
1030 .list_objects(Some(&["w".to_string()]), &opts)
1031 .unwrap();
1032 assert_eq!(results.len(), 3);
1033 assert_eq!(results[0].key.id, "a");
1034 assert_eq!(results[1].key.id, "b");
1035 assert_eq!(results[2].key.id, "c");
1036 }
1037
1038 #[test]
1041 fn delete_objects_batch_deletes_multiple() {
1042 let mut engine = MemoryEngine::new();
1043 engine
1044 .put_object(MemoryObject::new("w", "a", "{}"))
1045 .unwrap();
1046 engine
1047 .put_object(MemoryObject::new("w", "b", "{}"))
1048 .unwrap();
1049 engine
1050 .put_object(MemoryObject::new("w", "c", "{}"))
1051 .unwrap();
1052
1053 let keys = vec![
1054 ("w".to_string(), "a".to_string()),
1055 ("w".to_string(), "b".to_string()),
1056 ];
1057 let deleted = engine.delete_objects_batch(&keys).unwrap();
1058 assert_eq!(deleted, 2);
1059 assert_eq!(engine.count_objects().unwrap(), 1);
1060 assert!(engine.get_object("w", "a").unwrap().is_none());
1061 assert!(engine.get_object("w", "b").unwrap().is_none());
1062 assert!(engine.get_object("w", "c").unwrap().is_some());
1063 }
1064
1065 #[test]
1068 fn put_object_with_options_skip_index() {
1069 let mut engine = MemoryEngine::new();
1070 let opts = crate::PutObjectOptions { index: false };
1071
1072 engine
1073 .put_object_with_options(MemoryObject::new("w", "a", r#"{"text":"hello"}"#), opts)
1074 .unwrap();
1075
1076 let obj = engine.get_object("w", "a").unwrap();
1077 assert!(obj.is_some());
1078 assert_eq!(obj.unwrap().body, r#"{"text":"hello"}"#);
1079 }
1080}