1use std::collections::{BTreeMap, VecDeque};
4
5use crate::model::ListEventsOptions;
6use crate::{
7 EventLog, Link, LinkDirection, LinkQueryOptions, LinkStore, MemoryEvent, MemoryObject,
8 ObjectKey, ObjectStore, QueueClaimOptions, QueueJob, QueueJobStatus, QueueNackOptions,
9 QueueStore, ThingdError, ThingdResult, now_iso_string, u64_to_i64, unix_timestamp_millis,
10};
11
12#[derive(Default)]
28pub struct MemoryEngine {
29 objects: BTreeMap<ObjectKey, MemoryObject>,
30 events: Vec<MemoryEvent>,
31 queues: BTreeMap<String, VecDeque<QueueJob>>,
32 links: Vec<Link>,
33 next_event_sequence: u64,
34}
35
36impl MemoryEngine {
37 pub fn new() -> Self {
39 Self::default()
40 }
41}
42
43impl ObjectStore for MemoryEngine {
44 fn put_object(&mut self, mut object: MemoryObject) -> ThingdResult<MemoryObject> {
45 let now = now_iso_string();
46 let version = self
47 .objects
48 .get(&object.key)
49 .map_or(1, |existing| existing.version + 1);
50
51 object.version = version;
52 object.updated_at.clone_from(&now);
53 if object.created_at.is_empty() {
54 object.created_at = now;
55 }
56 self.objects.insert(object.key.clone(), object.clone());
57
58 Ok(object)
59 }
60
61 fn get_object(&self, collection: &str, id: &str) -> ThingdResult<Option<MemoryObject>> {
62 Ok(self.objects.get(&ObjectKey::new(collection, id)).cloned())
63 }
64
65 fn list_objects(&self, collections: Option<&[String]>) -> ThingdResult<Vec<MemoryObject>> {
66 let objects = self
67 .objects
68 .values()
69 .filter(|object| {
70 collections.is_none_or(|allowed| allowed.contains(&object.key.collection))
71 })
72 .cloned()
73 .collect();
74
75 Ok(objects)
76 }
77
78 fn delete_object(&mut self, collection: &str, id: &str) -> ThingdResult<bool> {
79 Ok(self
80 .objects
81 .remove(&ObjectKey::new(collection, id))
82 .is_some())
83 }
84
85 fn count_objects(&self) -> ThingdResult<u64> {
86 Ok(self.objects.len() as u64)
87 }
88
89 fn list_collections(&self) -> ThingdResult<Vec<String>> {
90 let mut collections: Vec<String> = self
91 .objects
92 .keys()
93 .map(|key| key.collection.clone())
94 .collect();
95 collections.sort();
96 collections.dedup();
97 Ok(collections)
98 }
99}
100
101impl EventLog for MemoryEngine {
102 fn append_event(&mut self, mut event: MemoryEvent) -> ThingdResult<MemoryEvent> {
103 self.next_event_sequence += 1;
104 event.sequence = self.next_event_sequence;
105 if event.created_at.is_empty() {
106 event.created_at = now_iso_string();
107 }
108 self.events.push(event.clone());
109
110 Ok(event)
111 }
112
113 fn list_events(
114 &self,
115 stream: Option<&str>,
116 options: ListEventsOptions,
117 ) -> ThingdResult<Vec<MemoryEvent>> {
118 let events = self
119 .events
120 .iter()
121 .filter(|event| stream.is_none_or(|target| event.stream == target))
122 .filter(|event| options.from_sequence.is_none_or(|seq| event.sequence > seq))
123 .cloned()
124 .collect::<Vec<_>>();
125
126 Ok(match options.limit {
127 Some(limit) => events
128 .into_iter()
129 .take(usize::try_from(limit).unwrap_or(usize::MAX))
130 .collect(),
131 None => events,
132 })
133 }
134
135 fn count_events(&self) -> ThingdResult<u64> {
136 Ok(self.events.len() as u64)
137 }
138
139 fn list_streams(&self) -> ThingdResult<Vec<String>> {
140 let mut streams: Vec<String> = self
141 .events
142 .iter()
143 .map(|event| event.stream.clone())
144 .collect();
145 streams.sort();
146 streams.dedup();
147 Ok(streams)
148 }
149}
150
151impl QueueStore for MemoryEngine {
152 fn push_job(&mut self, job: QueueJob) -> ThingdResult<QueueJob> {
153 let jobs = self.queues.entry(job.queue.clone()).or_default();
154
155 if let Some(existing) = jobs.iter().find(|candidate| candidate.id == job.id) {
156 return Ok(existing.clone());
157 }
158
159 jobs.push_back(job.clone());
160 Ok(job)
161 }
162
163 fn claim_job_with_options(
164 &mut self,
165 queue: &str,
166 options: QueueClaimOptions,
167 ) -> ThingdResult<Option<QueueJob>> {
168 self.release_expired_leases(queue);
169
170 let Some(jobs) = self.queues.get_mut(queue) else {
171 return Ok(None);
172 };
173
174 let now = unix_timestamp_millis();
175 let Some(job) = jobs.iter_mut().find(|candidate| {
176 candidate.status == QueueJobStatus::Ready && candidate.available_at_ms <= now
177 }) else {
178 return Ok(None);
179 };
180
181 job.status = QueueJobStatus::Leased;
182 job.attempts += 1;
183 job.leased_at_ms = Some(now);
184 job.lease_expires_at_ms = Some(now.saturating_add(u64_to_i64(options.lease_ms)));
185
186 Ok(Some(job.clone()))
187 }
188
189 fn ack_job(&mut self, queue: &str, id: &str) -> ThingdResult<Option<QueueJob>> {
190 let Some(job) = self.find_job_mut(queue, id) else {
191 return Ok(None);
192 };
193
194 if job.status != QueueJobStatus::Leased {
195 return Err(ThingdError::Conflict(format!(
196 "job {id} must be leased before ack"
197 )));
198 }
199
200 job.status = QueueJobStatus::Completed;
201 job.completed_at_ms = Some(unix_timestamp_millis());
202
203 Ok(Some(job.clone()))
204 }
205
206 fn nack_job_with_options(
207 &mut self,
208 queue: &str,
209 id: &str,
210 options: QueueNackOptions,
211 ) -> ThingdResult<Option<QueueJob>> {
212 let Some(job) = self.find_job_mut(queue, id) else {
213 return Ok(None);
214 };
215
216 if job.status != QueueJobStatus::Leased {
217 return Err(ThingdError::Conflict(format!(
218 "job {id} must be leased before nack"
219 )));
220 }
221
222 let now = unix_timestamp_millis();
223 job.leased_at_ms = None;
224 job.lease_expires_at_ms = None;
225 if !options.error.is_empty() {
226 job.last_error = options.error;
227 }
228 job.status = if job.attempts >= job.max_attempts {
229 job.dead_at_ms = Some(now);
230 QueueJobStatus::Dead
231 } else {
232 job.available_at_ms = now.saturating_add(u64_to_i64(options.delay_ms));
233 QueueJobStatus::Ready
234 };
235
236 Ok(Some(job.clone()))
237 }
238
239 fn list_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
240 Ok(self
241 .queues
242 .get(queue)
243 .map_or_else(Vec::new, |jobs| jobs.iter().cloned().collect()))
244 }
245
246 fn list_dead_jobs(&self, queue: &str) -> ThingdResult<Vec<QueueJob>> {
247 Ok(self.queues.get(queue).map_or_else(Vec::new, |jobs| {
248 jobs.iter()
249 .filter(|job| job.status == QueueJobStatus::Dead)
250 .cloned()
251 .collect()
252 }))
253 }
254
255 fn list_queues(&self) -> ThingdResult<Vec<String>> {
256 let mut queues: Vec<String> = self.queues.keys().cloned().collect();
257 queues.sort();
258 Ok(queues)
259 }
260
261 fn count_active_jobs(&self) -> ThingdResult<u64> {
262 let count = self
263 .queues
264 .values()
265 .flat_map(|jobs| jobs.iter())
266 .filter(|job| job.status != QueueJobStatus::Dead)
267 .count();
268 Ok(count as u64)
269 }
270
271 fn count_dead_jobs(&self) -> ThingdResult<u64> {
272 let count = self
273 .queues
274 .values()
275 .flat_map(|jobs| jobs.iter())
276 .filter(|job| job.status == QueueJobStatus::Dead)
277 .count();
278 Ok(count as u64)
279 }
280}
281
282impl MemoryEngine {
283 fn find_job_mut(&mut self, queue: &str, id: &str) -> Option<&mut QueueJob> {
284 self.queues
285 .get_mut(queue)?
286 .iter_mut()
287 .find(|job| job.id == id)
288 }
289
290 fn release_expired_leases(&mut self, queue: &str) {
291 let now = unix_timestamp_millis();
292
293 for job in self.queues.get_mut(queue).into_iter().flatten() {
294 if job.status == QueueJobStatus::Leased
295 && job
296 .lease_expires_at_ms
297 .is_some_and(|lease_expires_at_ms| lease_expires_at_ms <= now)
298 {
299 job.status = QueueJobStatus::Ready;
300 job.leased_at_ms = None;
301 job.lease_expires_at_ms = None;
302 }
303 }
304 }
305}
306
307impl crate::store::Searcher for MemoryEngine {
308 fn search(
309 &self,
310 query: &str,
311 options: crate::SearchOptions,
312 ) -> ThingdResult<Vec<crate::SearchHit>> {
313 let query_words: Vec<String> = query
314 .split_whitespace()
315 .map(|w| {
316 w.to_lowercase()
317 .chars()
318 .filter(|c| c.is_alphanumeric())
319 .collect()
320 })
321 .filter(|w: &String| !w.is_empty())
322 .collect();
323
324 if query_words.is_empty() {
325 return Ok(Vec::new());
326 }
327
328 let mut hits = Vec::new();
329
330 for object in self.objects.values() {
332 if let Some(ref collections) = options.collections
334 && !collections.contains(&object.key.collection)
335 {
336 continue;
337 }
338
339 if let Some(ref filter) = options.filter
341 && !matches_filter_memory(&object.body, filter)
342 {
343 continue;
344 }
345
346 let text_to_search = format!(
347 "{} {} {}",
348 object.key.collection, object.key.id, object.body
349 )
350 .to_lowercase();
351 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
352
353 if matches_all {
354 hits.push(crate::SearchHit {
355 kind: "object".to_string(),
356 collection: object.key.collection.clone(),
357 id: object.key.id.clone(),
358 text: object.body.clone(),
359 score: 1.0,
360 body: object.body.clone(),
361 version: Some(object.version),
362 created_at: object.created_at.clone(),
363 updated_at: Some(object.updated_at.clone()),
364 event_type: None,
365 });
366 }
367 }
368
369 for event in &self.events {
371 if let Some(ref collections) = options.collections
373 && !collections.contains(&event.stream)
374 {
375 continue;
376 }
377
378 if let Some(ref filter) = options.filter
380 && !matches_filter_memory(&event.body, filter)
381 {
382 continue;
383 }
384
385 let text_to_search =
386 format!("{} {} {}", event.stream, event.event_type, event.body).to_lowercase();
387 let matches_all = query_words.iter().all(|word| text_to_search.contains(word));
388
389 if matches_all {
390 hits.push(crate::SearchHit {
391 kind: "event".to_string(),
392 collection: event.stream.clone(),
393 id: event.sequence.to_string(),
394 text: event.body.clone(),
395 score: 1.0,
396 body: event.body.clone(),
397 version: None,
398 created_at: event.created_at.clone(),
399 updated_at: None,
400 event_type: Some(event.event_type.clone()),
401 });
402 }
403 }
404
405 if let Some(limit) = options.limit {
407 hits.truncate(limit);
408 }
409
410 Ok(hits)
411 }
412}
413
414impl LinkStore for MemoryEngine {
415 fn create_link(&mut self, mut link: Link) -> ThingdResult<Link> {
416 let id = format!("link-{}", self.links.len() + 1);
417 link.id = id;
418 if link.created_at.is_empty() {
419 link.created_at = now_iso_string();
420 }
421 self.links.push(link.clone());
422 Ok(link)
423 }
424
425 fn delete_link(&mut self, id: &str) -> ThingdResult<bool> {
426 let len_before = self.links.len();
427 self.links.retain(|l| l.id != id);
428 Ok(self.links.len() < len_before)
429 }
430
431 fn get_link(&self, id: &str) -> ThingdResult<Option<Link>> {
432 Ok(self.links.iter().find(|l| l.id == id).cloned())
433 }
434
435 fn get_neighbors(
436 &self,
437 reference: &str,
438 direction: LinkDirection,
439 options: LinkQueryOptions,
440 ) -> ThingdResult<Vec<Link>> {
441 let neighbors: Vec<Link> = self
442 .links
443 .iter()
444 .filter(|link| {
445 let matches_direction = match direction {
446 LinkDirection::Outgoing => link.from_ref == reference,
447 LinkDirection::Incoming => link.to_ref == reference,
448 LinkDirection::Both => link.from_ref == reference || link.to_ref == reference,
449 };
450 let matches_type = options
451 .link_type
452 .as_deref()
453 .is_none_or(|t| link.link_type == t);
454 matches_direction && matches_type
455 })
456 .cloned()
457 .collect();
458
459 Ok(match options.limit {
460 Some(limit) => neighbors.into_iter().take(limit).collect(),
461 None => neighbors,
462 })
463 }
464
465 fn count_links(&self) -> ThingdResult<u64> {
466 Ok(self.links.len() as u64)
467 }
468}
469
470fn matches_filter_memory(body_str: &str, filter: &serde_json::Value) -> bool {
471 let Ok(body) = serde_json::from_str::<serde_json::Value>(body_str) else {
472 return false;
473 };
474
475 let Some(filter_obj) = filter.as_object() else {
476 return true;
477 };
478
479 for (k, v) in filter_obj {
480 if body.get(k) != Some(v) {
481 return false;
482 }
483 }
484 true
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use crate::SearchOptions;
491 use crate::store::Searcher;
492
493 #[test]
494 fn stores_and_reads_objects() {
495 let mut engine = MemoryEngine::new();
496
497 let object = engine
498 .put_object(MemoryObject::new(
499 "decisions",
500 "rust-core",
501 "{\"text\":\"Use Rust\"}",
502 ))
503 .unwrap();
504
505 let stored = engine
506 .get_object("decisions", "rust-core")
507 .unwrap()
508 .unwrap();
509 assert_eq!(object.version, 1);
510 assert_eq!(stored.key.collection, "decisions");
511 assert_eq!(stored.key.id, "rust-core");
512 }
513
514 #[test]
515 fn lists_objects_with_optional_collection_filter() {
516 let mut engine = MemoryEngine::new();
517
518 engine
519 .put_object(MemoryObject::new("decisions", "rust-core", "{}"))
520 .unwrap();
521 engine
522 .put_object(MemoryObject::new("notes", "agent-guide", "{}"))
523 .unwrap();
524
525 let filtered = engine
526 .list_objects(Some(&["decisions".to_string()]))
527 .unwrap();
528
529 assert_eq!(engine.list_objects(None).unwrap().len(), 2);
530 assert_eq!(filtered.len(), 1);
531 assert_eq!(filtered[0].key.collection, "decisions");
532 }
533
534 #[test]
535 fn appends_events_with_sequence_numbers() {
536 let mut engine = MemoryEngine::new();
537
538 let event = engine
539 .append_event(MemoryEvent::new(
540 "project:thingd",
541 "decision.made",
542 "MCP-native object storage",
543 ))
544 .unwrap();
545
546 assert_eq!(event.sequence, 1);
547 assert_eq!(
548 engine
549 .list_events(Some("project:thingd"), ListEventsOptions::default())
550 .unwrap()
551 .len(),
552 1
553 );
554 }
555
556 #[test]
557 fn claims_and_acks_queue_jobs() {
558 let mut engine = MemoryEngine::new();
559
560 engine
561 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
562 .unwrap();
563
564 let claimed = engine.claim_job("embed").unwrap().unwrap();
565 let acked = engine.ack_job("embed", "job-1").unwrap().unwrap();
566
567 assert_eq!(claimed.status, QueueJobStatus::Leased);
568 assert_eq!(claimed.attempts, 1);
569 assert_eq!(acked.status, QueueJobStatus::Completed);
570 }
571
572 #[test]
573 fn nacks_jobs_to_dead_letter_after_max_attempts() {
574 let mut engine = MemoryEngine::new();
575
576 engine
577 .push_job(QueueJob::new("embed", "job-1", "doc-1", 1))
578 .unwrap();
579
580 engine.claim_job("embed").unwrap().unwrap();
581 let nacked = engine.nack_job("embed", "job-1").unwrap().unwrap();
582
583 assert_eq!(nacked.status, QueueJobStatus::Dead);
584 assert_eq!(engine.list_dead_jobs("embed").unwrap().len(), 1);
585 }
586
587 #[test]
588 fn does_not_claim_delayed_jobs_before_available() {
589 let mut engine = MemoryEngine::new();
590
591 engine
592 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3).delay_by_ms(60_000))
593 .unwrap();
594
595 assert!(engine.claim_job("embed").unwrap().is_none());
596 }
597
598 #[test]
599 fn reclaims_jobs_after_lease_expiration() {
600 let mut engine = MemoryEngine::new();
601
602 engine
603 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
604 .unwrap();
605
606 let first = engine
607 .claim_job_with_options("embed", QueueClaimOptions::new(0))
608 .unwrap()
609 .unwrap();
610 let second = engine.claim_job("embed").unwrap().unwrap();
611
612 assert_eq!(first.status, QueueJobStatus::Leased);
613 assert_eq!(second.status, QueueJobStatus::Leased);
614 assert_eq!(second.attempts, 2);
615 }
616
617 #[test]
618 fn nacks_jobs_with_retry_delay() {
619 let mut engine = MemoryEngine::new();
620
621 engine
622 .push_job(QueueJob::new("embed", "job-1", "doc-1", 3))
623 .unwrap();
624
625 engine.claim_job("embed").unwrap().unwrap();
626 let retried = engine
627 .nack_job_with_options("embed", "job-1", QueueNackOptions::new(60_000))
628 .unwrap()
629 .unwrap();
630
631 assert_eq!(retried.status, QueueJobStatus::Ready);
632 assert!(engine.claim_job("embed").unwrap().is_none());
633 }
634
635 #[test]
636 fn counts_objects_events_and_jobs() {
637 let mut engine = MemoryEngine::new();
638
639 assert_eq!(engine.count_objects().unwrap(), 0);
640 assert_eq!(engine.count_events().unwrap(), 0);
641 assert_eq!(engine.count_active_jobs().unwrap(), 0);
642 assert_eq!(engine.count_dead_jobs().unwrap(), 0);
643
644 engine
645 .put_object(MemoryObject::new("col-a", "o1", r#"{"v":1}"#))
646 .unwrap();
647 engine
648 .put_object(MemoryObject::new("col-a", "o2", r#"{"v":2}"#))
649 .unwrap();
650 engine
651 .put_object(MemoryObject::new("col-b", "o3", r#"{"v":3}"#))
652 .unwrap();
653 assert_eq!(engine.count_objects().unwrap(), 3);
654
655 engine
656 .append_event(MemoryEvent::new("s1", "t1", "e1"))
657 .unwrap();
658 engine
659 .append_event(MemoryEvent::new("s1", "t2", "e2"))
660 .unwrap();
661 engine
662 .append_event(MemoryEvent::new("s2", "t3", "e3"))
663 .unwrap();
664 assert_eq!(engine.count_events().unwrap(), 3);
665
666 engine
667 .push_job(QueueJob::new("work", "j1", "p1", 3))
668 .unwrap();
669 engine
670 .push_job(QueueJob::new("work", "j2", "p2", 3))
671 .unwrap();
672 engine
673 .push_job(QueueJob::new("other", "j3", "p3", 1))
674 .unwrap();
675 assert_eq!(engine.count_active_jobs().unwrap(), 3);
676
677 engine.claim_job("other").unwrap();
678 engine.nack_job("other", "j3").unwrap();
679 assert_eq!(engine.count_dead_jobs().unwrap(), 1);
680 assert_eq!(engine.count_active_jobs().unwrap(), 2);
681 }
682
683 #[test]
684 fn lists_collections_streams_and_queues() {
685 let mut engine = MemoryEngine::new();
686
687 assert!(engine.list_collections().unwrap().is_empty());
688 assert!(engine.list_streams().unwrap().is_empty());
689 assert!(engine.list_queues().unwrap().is_empty());
690
691 engine
692 .put_object(MemoryObject::new("col-a", "x", "{}"))
693 .unwrap();
694 engine
695 .put_object(MemoryObject::new("col-b", "y", "{}"))
696 .unwrap();
697 engine
698 .put_object(MemoryObject::new("col-a", "z", "{}"))
699 .unwrap();
700 let collections = engine.list_collections().unwrap();
701 assert_eq!(collections, vec!["col-a", "col-b"]);
702
703 engine
704 .append_event(MemoryEvent::new("s1", "t", "e1"))
705 .unwrap();
706 engine
707 .append_event(MemoryEvent::new("s2", "t", "e2"))
708 .unwrap();
709 let streams = engine.list_streams().unwrap();
710 assert_eq!(streams, vec!["s1", "s2"]);
711
712 engine
713 .push_job(QueueJob::new("work", "j1", "p1", 3))
714 .unwrap();
715 engine
716 .push_job(QueueJob::new("jobs", "j2", "p2", 3))
717 .unwrap();
718 let queues = engine.list_queues().unwrap();
719 assert_eq!(queues, vec!["jobs", "work"]);
720 }
721
722 #[test]
723 fn search_respects_filter_and_limit() {
724 let mut engine = MemoryEngine::new();
725
726 engine
727 .put_object(MemoryObject::new(
728 "docs",
729 "a",
730 r#"{"text":"hello world","tag":"greeting"}"#,
731 ))
732 .unwrap();
733 engine
734 .put_object(MemoryObject::new(
735 "docs",
736 "b",
737 r#"{"text":"hello there","tag":"greeting"}"#,
738 ))
739 .unwrap();
740 engine
741 .put_object(MemoryObject::new(
742 "docs",
743 "c",
744 r#"{"text":"goodbye world","tag":"farewell"}"#,
745 ))
746 .unwrap();
747
748 let all = engine.search("world", SearchOptions::default()).unwrap();
749 assert_eq!(all.len(), 2);
750
751 let limited = engine
752 .search(
753 "world",
754 SearchOptions {
755 limit: Some(1),
756 ..Default::default()
757 },
758 )
759 .unwrap();
760 assert_eq!(limited.len(), 1);
761
762 let filtered = engine
763 .search(
764 "hello",
765 SearchOptions {
766 collections: Some(vec!["docs".into()]),
767 ..Default::default()
768 },
769 )
770 .unwrap();
771 assert_eq!(filtered.len(), 2);
772 }
773}