Skip to main content

everruns_core/
memory.rs

1// In-memory implementations for examples and testing
2//
3// These implementations keep all data in memory, making them perfect for:
4// - Standalone examples that don't need a database
5// - Unit tests
6// - Quick prototyping
7
8use crate::agent::Agent;
9use crate::harness::Harness;
10use crate::llm_models::LlmProviderType;
11use crate::session::Session;
12use crate::tool_types::{ToolCall, ToolDefinition, ToolResult};
13use crate::traits::ModelWithProvider;
14use crate::typed_id::{AgentId, EventId, HarnessId, MessageId, ModelId, SessionId};
15use async_trait::async_trait;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tokio::sync::RwLock;
19use uuid::Uuid;
20
21use crate::error::Result;
22use crate::message::Message;
23use crate::message_filter::MessageQuery;
24use crate::message_retriever::{InputMessage, MessageRetriever};
25use crate::traits::{AgentStore, HarnessStore, LlmProviderStore, SessionStore, ToolExecutor};
26use chrono::Utc;
27
28// ============================================================================
29// InMemoryMessageRetriever - In-memory message storage for testing
30// ============================================================================
31
32/// In-memory message retriever
33///
34/// Stores messages in a HashMap keyed by session ID.
35/// Implements the `MessageRetriever` trait for retrieval operations.
36///
37/// Note: Write operations (add, store) are provided as inherent methods
38/// for testing purposes. In production, messages are stored via EventEmitter.
39#[derive(Debug, Default, Clone)]
40pub struct InMemoryMessageRetriever {
41    messages: Arc<RwLock<HashMap<SessionId, Vec<Message>>>>,
42}
43
44impl InMemoryMessageRetriever {
45    /// Create a new in-memory message retriever
46    pub fn new() -> Self {
47        Self {
48            messages: Arc::new(RwLock::new(HashMap::new())),
49        }
50    }
51
52    /// Get all sessions
53    pub async fn sessions(&self) -> Vec<SessionId> {
54        self.messages.read().await.keys().copied().collect()
55    }
56
57    /// Clear all messages
58    pub async fn clear(&self) {
59        self.messages.write().await.clear();
60    }
61
62    /// Clear messages for a specific session
63    pub async fn clear_session(&self, session_id: SessionId) {
64        self.messages.write().await.remove(&session_id);
65    }
66
67    /// Pre-populate with messages (useful for testing)
68    pub async fn seed(&self, session_id: SessionId, messages: Vec<Message>) {
69        self.messages.write().await.insert(session_id, messages);
70    }
71
72    /// Add a new message and return it with generated ID (for testing)
73    ///
74    /// Note: In production, messages are stored via EventService.
75    /// This method is provided for test setup and in-memory usage.
76    pub async fn add(&self, session_id: SessionId, input: InputMessage) -> Result<Message> {
77        let message = Message {
78            id: MessageId::new(),
79            role: input.role,
80            content: input.content,
81            phase: None,
82            thinking: None, // InputMessage doesn't include thinking (user messages don't have thinking)
83            thinking_signature: None,
84            controls: input.controls,
85            metadata: input.metadata,
86            external_actor: None,
87            created_at: Utc::now(),
88        };
89
90        self.messages
91            .write()
92            .await
93            .entry(session_id)
94            .or_default()
95            .push(message.clone());
96
97        Ok(message)
98    }
99
100    /// Store an existing message (for testing)
101    ///
102    /// Note: In production, messages are stored via EventEmitter.
103    /// This method is provided for test setup and in-memory usage.
104    pub async fn store(&self, session_id: SessionId, message: Message) -> Result<()> {
105        self.messages
106            .write()
107            .await
108            .entry(session_id)
109            .or_default()
110            .push(message);
111        Ok(())
112    }
113}
114
115#[async_trait]
116impl MessageRetriever for InMemoryMessageRetriever {
117    async fn get(&self, session_id: SessionId, message_id: MessageId) -> Result<Option<Message>> {
118        Ok(self
119            .messages
120            .read()
121            .await
122            .get(&session_id)
123            .and_then(|messages| messages.iter().find(|m| m.id == message_id).cloned()))
124    }
125
126    async fn load(&self, session_id: SessionId) -> Result<Vec<Message>> {
127        Ok(self
128            .messages
129            .read()
130            .await
131            .get(&session_id)
132            .cloned()
133            .unwrap_or_default())
134    }
135
136    async fn load_filtered(&self, query: MessageQuery) -> Result<Vec<Message>> {
137        use crate::message_filter::MessageFilter;
138
139        let mut messages = self.load(query.session_id).await?;
140
141        // Apply filters
142        for filter in &query.filters {
143            match filter {
144                MessageFilter::TimeRange { from, to } => {
145                    messages.retain(|m| {
146                        let after_from = from.is_none_or(|t| m.created_at >= t);
147                        let before_to = to.is_none_or(|t| m.created_at <= t);
148                        after_from && before_to
149                    });
150                }
151                MessageFilter::Search(q) => {
152                    let q_lower = q.to_lowercase();
153                    messages.retain(|m| {
154                        m.text()
155                            .is_some_and(|t| t.to_lowercase().contains(&q_lower))
156                    });
157                }
158                MessageFilter::Custom(predicate) => {
159                    messages.retain(|m| predicate(m));
160                }
161                // Other filters not commonly used in-memory
162                _ => {}
163            }
164        }
165
166        query.apply_windowing(&mut messages);
167
168        // Apply injections
169        if query.has_injections() {
170            query.apply_injections(&mut messages);
171        }
172
173        Ok(messages)
174    }
175
176    async fn count(&self, session_id: SessionId) -> Result<usize> {
177        Ok(self
178            .messages
179            .read()
180            .await
181            .get(&session_id)
182            .map(|m| m.len())
183            .unwrap_or(0))
184    }
185}
186
187// ============================================================================
188// InMemoryAgentStore - Stores agents in memory
189// ============================================================================
190
191/// In-memory agent store
192///
193/// Stores agents in a HashMap keyed by agent ID.
194/// Useful for testing and examples where you want to configure agents without a database.
195#[derive(Debug, Default, Clone)]
196pub struct InMemoryAgentStore {
197    agents: Arc<RwLock<HashMap<AgentId, Agent>>>,
198}
199
200impl InMemoryAgentStore {
201    /// Create a new in-memory agent store
202    pub fn new() -> Self {
203        Self {
204            agents: Arc::new(RwLock::new(HashMap::new())),
205        }
206    }
207
208    /// Add an agent to the store
209    pub async fn add_agent(&self, agent: Agent) {
210        self.agents.write().await.insert(agent.public_id, agent);
211    }
212
213    /// Get all agent IDs
214    pub async fn agent_ids(&self) -> Vec<AgentId> {
215        self.agents.read().await.keys().copied().collect()
216    }
217
218    /// Clear all agents
219    pub async fn clear(&self) {
220        self.agents.write().await.clear();
221    }
222}
223
224#[async_trait]
225impl AgentStore for InMemoryAgentStore {
226    async fn get_agent(&self, agent_id: AgentId) -> Result<Option<Agent>> {
227        Ok(self.agents.read().await.get(&agent_id).cloned())
228    }
229}
230
231// ============================================================================
232// InMemoryHarnessStore - Stores harnesses in memory
233// ============================================================================
234
235/// In-memory harness store
236///
237/// Stores harnesses in a HashMap keyed by harness ID.
238/// Useful for testing and examples where you want to configure harnesses without a database.
239#[derive(Debug, Default, Clone)]
240pub struct InMemoryHarnessStore {
241    harnesses: Arc<RwLock<HashMap<HarnessId, Harness>>>,
242}
243
244impl InMemoryHarnessStore {
245    /// Create a new in-memory harness store
246    pub fn new() -> Self {
247        Self {
248            harnesses: Arc::new(RwLock::new(HashMap::new())),
249        }
250    }
251
252    /// Add a harness to the store
253    pub async fn add_harness(&self, harness: Harness) {
254        self.harnesses.write().await.insert(harness.id, harness);
255    }
256}
257
258#[async_trait]
259impl HarnessStore for InMemoryHarnessStore {
260    async fn get_harness_chain(&self, harness_id: HarnessId) -> Result<Vec<Harness>> {
261        Ok(self
262            .harnesses
263            .read()
264            .await
265            .get(&harness_id)
266            .cloned()
267            .into_iter()
268            .collect())
269    }
270}
271
272// ============================================================================
273// InMemorySessionStore - Stores sessions in memory
274// ============================================================================
275
276/// In-memory session store
277///
278/// Stores sessions in a HashMap keyed by session ID.
279/// Useful for testing and examples where you want to configure sessions without a database.
280#[derive(Debug, Default, Clone)]
281pub struct InMemorySessionStore {
282    sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
283}
284
285impl InMemorySessionStore {
286    /// Create a new in-memory session store
287    pub fn new() -> Self {
288        Self {
289            sessions: Arc::new(RwLock::new(HashMap::new())),
290        }
291    }
292
293    /// Add a session to the store
294    pub async fn add_session(&self, session: Session) {
295        self.sessions.write().await.insert(session.id, session);
296    }
297
298    /// Get all session IDs
299    pub async fn session_ids(&self) -> Vec<SessionId> {
300        self.sessions.read().await.keys().copied().collect()
301    }
302
303    /// Clear all sessions
304    pub async fn clear(&self) {
305        self.sessions.write().await.clear();
306    }
307}
308
309#[async_trait]
310impl SessionStore for InMemorySessionStore {
311    async fn get_session(&self, session_id: SessionId) -> Result<Option<Session>> {
312        Ok(self.sessions.read().await.get(&session_id).cloned())
313    }
314}
315
316// ============================================================================
317// InMemoryLlmProviderStore - Stores LLM provider configurations in memory
318// ============================================================================
319
320/// In-memory LLM provider store
321///
322/// Stores model configurations in a HashMap keyed by model UUID.
323/// Useful for testing and examples where you want to configure providers without a database.
324///
325/// # Example
326///
327/// ```ignore
328/// use everruns_core::memory::InMemoryLlmProviderStore;
329/// use everruns_core::llm_entities::LlmProviderType;
330///
331/// let store = InMemoryLlmProviderStore::from_env().await;
332/// // Uses OPENAI_API_KEY or ANTHROPIC_API_KEY from environment
333/// ```
334#[derive(Debug, Default, Clone)]
335pub struct InMemoryLlmProviderStore {
336    models: Arc<RwLock<HashMap<ModelId, ModelWithProvider>>>,
337    default_model: Arc<RwLock<Option<ModelWithProvider>>>,
338}
339
340impl InMemoryLlmProviderStore {
341    /// Create a new empty in-memory provider store
342    pub fn new() -> Self {
343        Self {
344            models: Arc::new(RwLock::new(HashMap::new())),
345            default_model: Arc::new(RwLock::new(None)),
346        }
347    }
348
349    /// Create a provider store from environment variables
350    ///
351    /// Checks for OPENAI_API_KEY or ANTHROPIC_API_KEY and configures
352    /// a default model accordingly.
353    pub async fn from_env() -> Self {
354        let store = Self::new();
355
356        // Check for OpenAI first
357        if let Ok(api_key) = std::env::var("OPENAI_API_KEY") {
358            let model = ModelWithProvider {
359                model: "gpt-5.4".to_string(),
360                provider_type: LlmProviderType::Openai,
361                api_key: Some(api_key),
362                base_url: std::env::var("OPENAI_BASE_URL").ok(),
363            };
364            store.set_default_model(model).await;
365        } else if let Ok(api_key) = std::env::var("ANTHROPIC_API_KEY") {
366            let model = ModelWithProvider {
367                model: "claude-sonnet-4-20250514".to_string(),
368                provider_type: LlmProviderType::Anthropic,
369                api_key: Some(api_key),
370                base_url: std::env::var("ANTHROPIC_BASE_URL").ok(),
371            };
372            store.set_default_model(model).await;
373        }
374
375        store
376    }
377
378    /// Create a provider store with a specific default model
379    pub async fn with_default(model: ModelWithProvider) -> Self {
380        let store = Self::new();
381        store.set_default_model(model).await;
382        store
383    }
384
385    /// Add a model to the store
386    pub async fn add_model(&self, model_id: ModelId, model: ModelWithProvider) {
387        self.models.write().await.insert(model_id, model);
388    }
389
390    /// Set the default model
391    pub async fn set_default_model(&self, model: ModelWithProvider) {
392        *self.default_model.write().await = Some(model);
393    }
394
395    /// Clear all models
396    pub async fn clear(&self) {
397        self.models.write().await.clear();
398        *self.default_model.write().await = None;
399    }
400}
401
402#[async_trait]
403impl LlmProviderStore for InMemoryLlmProviderStore {
404    async fn get_model_with_provider(
405        &self,
406        model_id: ModelId,
407    ) -> Result<Option<ModelWithProvider>> {
408        Ok(self.models.read().await.get(&model_id).cloned())
409    }
410
411    async fn get_default_model(&self) -> Result<Option<ModelWithProvider>> {
412        Ok(self.default_model.read().await.clone())
413    }
414}
415
416// ============================================================================
417// MockToolExecutor - Returns predefined results
418// ============================================================================
419
420/// Mock tool executor for testing
421///
422/// Returns predefined results based on tool name.
423#[derive(Debug, Default)]
424pub struct MockToolExecutor {
425    results: Arc<RwLock<HashMap<String, serde_json::Value>>>,
426    call_log: Arc<RwLock<Vec<ToolCall>>>,
427}
428
429impl MockToolExecutor {
430    /// Create a new mock tool executor
431    pub fn new() -> Self {
432        Self {
433            results: Arc::new(RwLock::new(HashMap::new())),
434            call_log: Arc::new(RwLock::new(Vec::new())),
435        }
436    }
437
438    /// Set the result for a specific tool
439    pub async fn set_result(&self, tool_name: impl Into<String>, result: serde_json::Value) {
440        self.results.write().await.insert(tool_name.into(), result);
441    }
442
443    /// Get the call log
444    pub async fn calls(&self) -> Vec<ToolCall> {
445        self.call_log.read().await.clone()
446    }
447
448    /// Clear the call log
449    pub async fn clear_calls(&self) {
450        self.call_log.write().await.clear();
451    }
452}
453
454#[async_trait]
455impl ToolExecutor for MockToolExecutor {
456    async fn execute(
457        &self,
458        tool_call: &ToolCall,
459        _tool_def: &ToolDefinition,
460    ) -> Result<ToolResult> {
461        // Log the call
462        self.call_log.write().await.push(tool_call.clone());
463
464        // Return predefined result or default
465        let result = self
466            .results
467            .read()
468            .await
469            .get(&tool_call.name)
470            .cloned()
471            .unwrap_or_else(|| serde_json::json!({"status": "ok"}));
472
473        Ok(ToolResult {
474            tool_call_id: tool_call.id.clone(),
475            result: Some(result),
476            images: None,
477            error: None,
478            connection_required: None,
479            raw_output: None,
480        })
481    }
482}
483
484// ============================================================================
485// EchoToolExecutor - Echoes back the arguments
486// ============================================================================
487
488/// Tool executor that echoes back the arguments
489///
490/// Useful for simple testing without setting up mock results.
491#[derive(Debug, Default, Clone, Copy)]
492pub struct EchoToolExecutor;
493
494impl EchoToolExecutor {
495    pub fn new() -> Self {
496        Self
497    }
498}
499
500#[async_trait]
501impl ToolExecutor for EchoToolExecutor {
502    async fn execute(
503        &self,
504        tool_call: &ToolCall,
505        _tool_def: &ToolDefinition,
506    ) -> Result<ToolResult> {
507        Ok(ToolResult {
508            tool_call_id: tool_call.id.clone(),
509            result: Some(serde_json::json!({
510                "echoed_tool": tool_call.name,
511                "echoed_arguments": tool_call.arguments
512            })),
513            images: None,
514            error: None,
515            connection_required: None,
516            raw_output: None,
517        })
518    }
519}
520
521// ============================================================================
522// FailingToolExecutor - Always returns an error
523// ============================================================================
524
525/// Tool executor that always fails
526///
527/// Useful for testing error handling.
528#[derive(Debug, Clone)]
529pub struct FailingToolExecutor {
530    error_message: String,
531}
532
533impl FailingToolExecutor {
534    pub fn new(error_message: impl Into<String>) -> Self {
535        Self {
536            error_message: error_message.into(),
537        }
538    }
539}
540
541impl Default for FailingToolExecutor {
542    fn default() -> Self {
543        Self::new("Tool execution failed")
544    }
545}
546
547#[async_trait]
548impl ToolExecutor for FailingToolExecutor {
549    async fn execute(
550        &self,
551        tool_call: &ToolCall,
552        _tool_def: &ToolDefinition,
553    ) -> Result<ToolResult> {
554        Ok(ToolResult {
555            tool_call_id: tool_call.id.clone(),
556            result: None,
557            images: None,
558            error: Some(self.error_message.clone()),
559            connection_required: None,
560            raw_output: None,
561        })
562    }
563}
564
565// ============================================================================
566// MockLlmProvider - Returns predefined responses
567// ============================================================================
568
569use crate::events::{Event, EventRequest};
570use crate::llm_driver_registry::{
571    LlmCallConfig, LlmDriver, LlmMessage, LlmResponseStream, LlmStreamEvent,
572};
573use crate::traits::EventEmitter;
574use futures::stream;
575
576/// Mock LLM provider for testing
577///
578/// Returns predefined responses in sequence.
579#[derive(Debug, Default)]
580pub struct MockLlmProvider {
581    responses: Arc<RwLock<Vec<MockLlmResponse>>>,
582    call_index: Arc<RwLock<usize>>,
583    call_log: Arc<RwLock<Vec<Vec<LlmMessage>>>>,
584}
585
586/// A mock LLM response
587#[derive(Debug, Clone)]
588pub struct MockLlmResponse {
589    pub text: String,
590    pub tool_calls: Option<Vec<ToolCall>>,
591}
592
593impl MockLlmResponse {
594    /// Create a text-only response
595    pub fn text(text: impl Into<String>) -> Self {
596        Self {
597            text: text.into(),
598            tool_calls: None,
599        }
600    }
601
602    /// Create a response with tool calls
603    pub fn with_tools(text: impl Into<String>, tool_calls: Vec<ToolCall>) -> Self {
604        Self {
605            text: text.into(),
606            tool_calls: Some(tool_calls),
607        }
608    }
609}
610
611impl MockLlmProvider {
612    /// Create a new mock LLM provider
613    pub fn new() -> Self {
614        Self {
615            responses: Arc::new(RwLock::new(Vec::new())),
616            call_index: Arc::new(RwLock::new(0)),
617            call_log: Arc::new(RwLock::new(Vec::new())),
618        }
619    }
620
621    /// Add a response to the queue
622    pub async fn add_response(&self, response: MockLlmResponse) {
623        self.responses.write().await.push(response);
624    }
625
626    /// Set all responses at once
627    pub async fn set_responses(&self, responses: Vec<MockLlmResponse>) {
628        *self.responses.write().await = responses;
629        *self.call_index.write().await = 0;
630    }
631
632    /// Get the call log
633    pub async fn calls(&self) -> Vec<Vec<LlmMessage>> {
634        self.call_log.read().await.clone()
635    }
636
637    /// Reset the provider
638    pub async fn reset(&self) {
639        self.responses.write().await.clear();
640        *self.call_index.write().await = 0;
641        self.call_log.write().await.clear();
642    }
643}
644
645#[async_trait]
646impl LlmDriver for MockLlmProvider {
647    async fn chat_completion_stream(
648        &self,
649        messages: Vec<LlmMessage>,
650        _config: &LlmCallConfig,
651    ) -> Result<LlmResponseStream> {
652        // Log the call
653        self.call_log.write().await.push(messages);
654
655        // Get next response
656        let mut index = self.call_index.write().await;
657        let responses = self.responses.read().await;
658
659        let response = responses.get(*index).cloned().unwrap_or_else(|| {
660            MockLlmResponse::text("Mock response (no more responses configured)")
661        });
662
663        *index += 1;
664        drop(index);
665        drop(responses);
666
667        // Create a stream that emits the response
668        let events = vec![
669            Ok(LlmStreamEvent::TextDelta(response.text.clone())),
670            if let Some(tool_calls) = response.tool_calls {
671                Ok(LlmStreamEvent::ToolCalls(tool_calls))
672            } else {
673                Ok(LlmStreamEvent::Done(Box::default()))
674            },
675            Ok(LlmStreamEvent::Done(Box::default())),
676        ];
677
678        Ok(Box::pin(stream::iter(events)))
679    }
680}
681
682// ============================================================================
683// InMemoryEventEmitter - Stores events in memory for testing
684// ============================================================================
685
686/// In-memory event emitter for testing
687///
688/// Stores emitted events in memory for inspection.
689/// Useful for testing and examples where you want to verify events without a database.
690///
691/// # Example
692///
693/// ```ignore
694/// use everruns_core::memory::InMemoryEventEmitter;
695///
696/// let emitter = InMemoryEventEmitter::new();
697///
698/// // Emit events...
699///
700/// // Check emitted events
701/// let events = emitter.events().await;
702/// assert_eq!(events.len(), 2);
703/// ```
704#[derive(Debug, Default, Clone)]
705pub struct InMemoryEventEmitter {
706    events: Arc<RwLock<Vec<Event>>>,
707    sequence: Arc<RwLock<i32>>,
708}
709
710impl InMemoryEventEmitter {
711    /// Create a new in-memory event emitter
712    pub fn new() -> Self {
713        Self {
714            events: Arc::new(RwLock::new(Vec::new())),
715            sequence: Arc::new(RwLock::new(0)),
716        }
717    }
718
719    /// Get all emitted events
720    pub async fn events(&self) -> Vec<Event> {
721        self.events.read().await.clone()
722    }
723
724    /// Get the count of emitted events
725    pub async fn event_count(&self) -> usize {
726        self.events.read().await.len()
727    }
728
729    /// Clear all events
730    pub async fn clear(&self) {
731        self.events.write().await.clear();
732        *self.sequence.write().await = 0;
733    }
734
735    /// Get events by type
736    pub async fn events_by_type(&self, event_type: &str) -> Vec<Event> {
737        self.events
738            .read()
739            .await
740            .iter()
741            .filter(|e| e.event_type == event_type)
742            .cloned()
743            .collect()
744    }
745
746    /// Get events for a specific session
747    pub async fn events_for_session(&self, session_id: Uuid) -> Vec<Event> {
748        self.events
749            .read()
750            .await
751            .iter()
752            .filter(|e| e.session_uuid() == session_id)
753            .cloned()
754            .collect()
755    }
756}
757
758#[async_trait]
759impl EventEmitter for InMemoryEventEmitter {
760    async fn emit(&self, request: EventRequest) -> Result<Event> {
761        let mut sequence = self.sequence.write().await;
762        *sequence += 1;
763        let seq = *sequence;
764        drop(sequence);
765
766        // Convert EventRequest to Event with generated id and sequence
767        let event = request.into_event(EventId::new(), seq);
768        self.events.write().await.push(event.clone());
769        Ok(event)
770    }
771}
772
773#[cfg(test)]
774mod tests {
775    use super::*;
776    use uuid::Uuid;
777
778    #[tokio::test]
779    async fn test_in_memory_message_retriever() {
780        let store = InMemoryMessageRetriever::new();
781        let session_id: SessionId = Uuid::now_v7().into();
782
783        store
784            .store(session_id, Message::user("Hello"))
785            .await
786            .unwrap();
787
788        let messages = store.load(session_id).await.unwrap();
789        assert_eq!(messages.len(), 1);
790        assert_eq!(messages[0].text(), Some("Hello"));
791    }
792
793    #[tokio::test]
794    async fn test_in_memory_message_retriever_add_and_get() {
795        let store = InMemoryMessageRetriever::new();
796        let session_id: SessionId = Uuid::now_v7().into();
797
798        // Add a message using the add method
799        let message = store
800            .add(session_id, InputMessage::user("Hello via add"))
801            .await
802            .unwrap();
803
804        // Get the message by ID
805        let retrieved = store.get(session_id, message.id).await.unwrap();
806        assert!(retrieved.is_some());
807        assert_eq!(retrieved.unwrap().text(), Some("Hello via add"));
808
809        // Get non-existent message
810        let missing = store.get(session_id, MessageId::new()).await.unwrap();
811        assert!(missing.is_none());
812    }
813
814    /// Regression test: add() must return message with ID usable for get()
815    ///
816    /// This test documents a critical invariant: the ID in the message returned by
817    /// add() must match the ID stored internally, so that get(returned_id) succeeds.
818    #[tokio::test]
819    async fn test_message_retriever_add_returns_consistent_id() {
820        let store = InMemoryMessageRetriever::new();
821        let session_id: SessionId = Uuid::now_v7().into();
822
823        // Add a message
824        let added = store
825            .add(session_id, InputMessage::user("Test consistency"))
826            .await
827            .unwrap();
828
829        // The returned message ID must be retrievable
830        let retrieved = store.get(session_id, added.id).await.unwrap();
831        assert!(
832            retrieved.is_some(),
833            "Message must be retrievable by the ID returned from add()"
834        );
835
836        // The retrieved message must have the same ID
837        let retrieved = retrieved.unwrap();
838        assert_eq!(
839            retrieved.id, added.id,
840            "Retrieved message ID must match the ID returned from add()"
841        );
842
843        // The message must also appear in load() with the same ID
844        let all_messages = store.load(session_id).await.unwrap();
845        let found = all_messages.iter().find(|m| m.id == added.id);
846        assert!(
847            found.is_some(),
848            "Message with returned ID must appear in load() results"
849        );
850    }
851
852    #[tokio::test]
853    async fn test_mock_tool_executor() {
854        let executor = MockToolExecutor::new();
855        executor
856            .set_result("get_weather", serde_json::json!({"temp": 72}))
857            .await;
858
859        let tool_call = ToolCall {
860            id: "call_1".to_string(),
861            name: "get_weather".to_string(),
862            arguments: serde_json::json!({"city": "NYC"}),
863        };
864
865        let tool_def = ToolDefinition::Builtin(crate::tool_types::BuiltinTool {
866            name: "get_weather".to_string(),
867            display_name: None,
868            description: "Get weather".to_string(),
869            parameters: serde_json::json!({}),
870            policy: crate::tool_types::ToolPolicy::Auto,
871            category: None,
872            deferrable: crate::tool_types::DeferrablePolicy::default(),
873            hints: crate::tool_types::ToolHints::default(),
874            full_parameters: None,
875        });
876
877        let result = executor.execute(&tool_call, &tool_def).await.unwrap();
878
879        assert!(result.error.is_none());
880        assert_eq!(result.result, Some(serde_json::json!({"temp": 72})));
881    }
882
883    #[tokio::test]
884    async fn test_in_memory_event_emitter() {
885        use crate::events::{EventContext, EventRequest, InputMessageData};
886
887        let emitter = InMemoryEventEmitter::new();
888        let session_id: SessionId = Uuid::now_v7().into();
889        let event_context = EventContext::empty();
890
891        // Emit an event
892        let event1 = emitter
893            .emit(EventRequest::new(
894                session_id,
895                event_context.clone(),
896                InputMessageData::new(Message::user("test1")),
897            ))
898            .await
899            .unwrap();
900        assert_eq!(event1.sequence, Some(1));
901
902        // Emit another event
903        let event2 = emitter
904            .emit(EventRequest::new(
905                session_id,
906                event_context,
907                InputMessageData::new(Message::user("test2")),
908            ))
909            .await
910            .unwrap();
911        assert_eq!(event2.sequence, Some(2));
912
913        // Check events
914        let events = emitter.events().await;
915        assert_eq!(events.len(), 2);
916        assert_eq!(emitter.event_count().await, 2);
917    }
918
919    #[tokio::test]
920    async fn test_in_memory_event_emitter_filter_by_type() {
921        use crate::events::{
922            EventContext, EventRequest, INPUT_MESSAGE, InputMessageData, REASON_STARTED,
923            ReasonStartedData,
924        };
925
926        let emitter = InMemoryEventEmitter::new();
927        let session_id: SessionId = Uuid::now_v7().into();
928        let event_context = EventContext::empty();
929
930        // Emit different event types
931        emitter
932            .emit(EventRequest::new(
933                session_id,
934                event_context.clone(),
935                InputMessageData::new(Message::user("test")),
936            ))
937            .await
938            .unwrap();
939
940        emitter
941            .emit(EventRequest::new(
942                session_id,
943                event_context,
944                ReasonStartedData {
945                    harness_id: HarnessId::from_seed(1),
946                    agent_id: Some(AgentId::new()),
947                    metadata: None,
948                },
949            ))
950            .await
951            .unwrap();
952
953        // Filter by type
954        let received_events = emitter.events_by_type(INPUT_MESSAGE).await;
955        assert_eq!(received_events.len(), 1);
956
957        let started_events = emitter.events_by_type(REASON_STARTED).await;
958        assert_eq!(started_events.len(), 1);
959    }
960
961    #[tokio::test]
962    async fn test_in_memory_event_emitter_filter_by_session() {
963        use crate::events::{EventContext, EventRequest, InputMessageData};
964
965        let emitter = InMemoryEventEmitter::new();
966        let session1: SessionId = Uuid::now_v7().into();
967        let session2: SessionId = Uuid::now_v7().into();
968
969        // Emit events for different sessions
970        let context = EventContext::empty();
971
972        emitter
973            .emit(EventRequest::new(
974                session1,
975                context.clone(),
976                InputMessageData::new(Message::user("session1")),
977            ))
978            .await
979            .unwrap();
980        emitter
981            .emit(EventRequest::new(
982                session2,
983                context,
984                InputMessageData::new(Message::user("session2")),
985            ))
986            .await
987            .unwrap();
988
989        // Filter by session
990        let session1_events = emitter.events_for_session(session1.uuid()).await;
991        assert_eq!(session1_events.len(), 1);
992
993        let session2_events = emitter.events_for_session(session2.uuid()).await;
994        assert_eq!(session2_events.len(), 1);
995    }
996
997    #[tokio::test]
998    async fn test_in_memory_event_emitter_clear() {
999        use crate::events::{EventContext, EventRequest, InputMessageData};
1000
1001        let emitter = InMemoryEventEmitter::new();
1002        let session_id: SessionId = Uuid::now_v7().into();
1003        let event_context = EventContext::empty();
1004
1005        emitter
1006            .emit(EventRequest::new(
1007                session_id,
1008                event_context,
1009                InputMessageData::new(Message::user("test")),
1010            ))
1011            .await
1012            .unwrap();
1013
1014        assert_eq!(emitter.event_count().await, 1);
1015
1016        emitter.clear().await;
1017
1018        assert_eq!(emitter.event_count().await, 0);
1019    }
1020}
1021
1022// ============================================================================
1023// InMemoryMemoryStore — for dev mode and testing
1024// ============================================================================
1025
1026use crate::memory_store::{
1027    Memory, MemoryContentPart, MemoryKind, MemoryQuery, MemoryStoreBackend, MemoryStoreEntity,
1028};
1029use crate::typed_id::{MemoryId, MemoryStoreId, OrgId};
1030
1031/// In-memory implementation of `MemoryStoreBackend` for dev mode and testing.
1032#[derive(Debug, Default, Clone)]
1033pub struct InMemoryMemoryStore {
1034    stores: Arc<RwLock<Vec<MemoryStoreEntity>>>,
1035    memories: Arc<RwLock<Vec<Memory>>>,
1036}
1037
1038impl InMemoryMemoryStore {
1039    pub fn new() -> Self {
1040        Self::default()
1041    }
1042}
1043
1044#[async_trait]
1045impl MemoryStoreBackend for InMemoryMemoryStore {
1046    async fn get_or_create_default_store(&self, org_id: OrgId) -> Result<MemoryStoreEntity> {
1047        let mut stores = self.stores.write().await;
1048        if let Some(store) = stores.iter().find(|s| s.org_id == org_id && s.is_default) {
1049            return Ok(store.clone());
1050        }
1051        let store = MemoryStoreEntity {
1052            id: MemoryStoreId::new(),
1053            org_id,
1054            name: "default".to_string(),
1055            is_default: true,
1056            created_at: chrono::Utc::now(),
1057        };
1058        stores.push(store.clone());
1059        Ok(store)
1060    }
1061
1062    async fn get_store(&self, store_id: MemoryStoreId) -> Result<Option<MemoryStoreEntity>> {
1063        Ok(self
1064            .stores
1065            .read()
1066            .await
1067            .iter()
1068            .find(|s| s.id == store_id)
1069            .cloned())
1070    }
1071
1072    async fn create_memory(
1073        &self,
1074        store_id: MemoryStoreId,
1075        content: String,
1076        content_parts: Vec<MemoryContentPart>,
1077        kind: MemoryKind,
1078        importance: u8,
1079        tags: Vec<String>,
1080    ) -> Result<Memory> {
1081        let now = chrono::Utc::now();
1082        let memory = Memory {
1083            id: MemoryId::new(),
1084            store_id,
1085            content,
1086            content_parts,
1087            kind,
1088            importance: importance.clamp(1, 10),
1089            tags,
1090            active: true,
1091            created_at: now,
1092            updated_at: now,
1093        };
1094        self.memories.write().await.push(memory.clone());
1095        Ok(memory)
1096    }
1097
1098    async fn recall(&self, query: MemoryQuery) -> Result<(Vec<Memory>, usize)> {
1099        let memories = self.memories.read().await;
1100        let mut results: Vec<&Memory> = memories
1101            .iter()
1102            .filter(|m| m.active)
1103            .filter(|m| {
1104                if let Some(ref sid) = query.store_id {
1105                    m.store_id == *sid
1106                } else {
1107                    true
1108                }
1109            })
1110            .filter(|m| {
1111                if let Some(ref kind) = query.kind {
1112                    m.kind == *kind
1113                } else {
1114                    true
1115                }
1116            })
1117            .filter(|m| {
1118                if let Some(ref tags) = query.tags {
1119                    tags.iter().all(|t| m.tags.contains(t))
1120                } else {
1121                    true
1122                }
1123            })
1124            .filter(|m| {
1125                if let Some(ref q) = query.query {
1126                    let q_lower = q.to_lowercase();
1127                    m.content.to_lowercase().contains(&q_lower)
1128                        || m.tags.iter().any(|t| t.to_lowercase().contains(&q_lower))
1129                } else {
1130                    true
1131                }
1132            })
1133            .collect();
1134
1135        // Sort by importance desc, then by created_at desc
1136        results.sort_by(|a, b| {
1137            b.importance
1138                .cmp(&a.importance)
1139                .then_with(|| b.created_at.cmp(&a.created_at))
1140        });
1141
1142        let total = results.len();
1143        let limit = if query.limit > 0 { query.limit } else { 10 };
1144        let results: Vec<Memory> = results.into_iter().take(limit).cloned().collect();
1145        Ok((results, total))
1146    }
1147
1148    async fn forget(&self, store_id: MemoryStoreId, memory_id: MemoryId) -> Result<bool> {
1149        let mut memories = self.memories.write().await;
1150        if let Some(m) = memories
1151            .iter_mut()
1152            .find(|m| m.id == memory_id && m.store_id == store_id && m.active)
1153        {
1154            m.active = false;
1155            m.updated_at = chrono::Utc::now();
1156            Ok(true)
1157        } else {
1158            Ok(false)
1159        }
1160    }
1161
1162    async fn count_active(&self, store_id: MemoryStoreId) -> Result<usize> {
1163        Ok(self
1164            .memories
1165            .read()
1166            .await
1167            .iter()
1168            .filter(|m| m.store_id == store_id && m.active)
1169            .count())
1170    }
1171}