Skip to main content

everruns_runtime/
in_memory.rs

1// In-memory runtime stores.
2// Decision: runtime ships batteries-included in-memory stores for public
3// embedding so common capabilities work without depending on server internals.
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::session::Session;
9use everruns_core::session_file::{FileInfo, FileStat, GrepMatch, InitialFile, SessionFile};
10use everruns_core::traits::{
11    KeyInfo, SecretInfo, SessionFileSystem, SessionFileSystemFactory,
12    SessionFileSystemFactoryContext, SessionMutator, SessionStorageStore, SessionStore,
13};
14use everruns_core::typed_id::SessionId;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20/// In-memory `SessionStore` + `SessionMutator` for embedded runtimes.
21///
22/// This is the default session backend used by [`crate::InProcessRuntime`].
23#[derive(Debug, Default, Clone)]
24pub struct InMemorySessionStore {
25    sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
26}
27
28impl InMemorySessionStore {
29    /// Create an empty in-memory session store.
30    pub fn new() -> Self {
31        Self {
32            sessions: Arc::new(RwLock::new(HashMap::new())),
33        }
34    }
35
36    /// Insert or replace a session in the store.
37    pub async fn add_session(&self, session: Session) {
38        self.sessions.write().await.insert(session.id, session);
39    }
40}
41
42#[async_trait]
43impl SessionStore for InMemorySessionStore {
44    async fn get_session(&self, session_id: SessionId) -> Result<Option<Session>> {
45        Ok(self.sessions.read().await.get(&session_id).cloned())
46    }
47}
48
49#[async_trait]
50impl SessionMutator for InMemorySessionStore {
51    async fn update_session_title(&self, session_id: SessionId, title: String) -> Result<Session> {
52        let mut sessions = self.sessions.write().await;
53        let session = sessions
54            .get_mut(&session_id)
55            .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
56        session.title = Some(title);
57        session.updated_at = Utc::now();
58        Ok(session.clone())
59    }
60}
61
62#[derive(Debug, Clone)]
63struct FileEntry {
64    file: SessionFile,
65}
66
67/// In-memory implementation of the session virtual filesystem.
68///
69/// Paths accept either canonical session paths (`/notes.txt`) or `/workspace`
70/// prefixed paths (`/workspace/notes.txt`).
71#[derive(Debug, Default, Clone)]
72pub struct InMemorySessionFileStore {
73    files: Arc<RwLock<HashMap<(SessionId, String), FileEntry>>>,
74}
75
76/// Factory for the runtime's in-memory session filesystem.
77#[derive(Debug, Clone, Default)]
78pub struct InMemorySessionFileSystemFactory;
79
80#[async_trait]
81impl SessionFileSystemFactory for InMemorySessionFileSystemFactory {
82    fn name(&self) -> &'static str {
83        "InMemorySessionFileSystemFactory"
84    }
85
86    async fn create_session_file_system(
87        &self,
88        _context: SessionFileSystemFactoryContext,
89    ) -> Result<Arc<dyn SessionFileSystem>> {
90        Ok(Arc::new(InMemorySessionFileStore::new()))
91    }
92}
93
94impl InMemorySessionFileStore {
95    /// Create an empty in-memory file store.
96    pub fn new() -> Self {
97        Self {
98            files: Arc::new(RwLock::new(HashMap::new())),
99        }
100    }
101
102    /// Seed a file into a session workspace.
103    pub async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
104        let path = normalize_path(&file.path);
105        self.ensure_parent_directories(session_id, &path).await?;
106        self.upsert_file(
107            session_id,
108            &path,
109            &file.content,
110            &file.encoding,
111            file.is_readonly,
112        )
113        .await
114        .map(|_| ())
115    }
116
117    async fn ensure_parent_directories(&self, session_id: SessionId, path: &str) -> Result<()> {
118        let mut current = String::new();
119        for segment in path.trim_start_matches('/').split('/').collect::<Vec<_>>() {
120            if segment.is_empty() {
121                continue;
122            }
123            current.push('/');
124            current.push_str(segment);
125            let is_leaf = current == path;
126            if is_leaf {
127                break;
128            }
129            self.insert_directory_if_missing(session_id, &current)
130                .await?;
131        }
132        Ok(())
133    }
134
135    async fn insert_directory_if_missing(&self, session_id: SessionId, path: &str) -> Result<()> {
136        let path = normalize_path(path);
137        if path == "/" {
138            return Ok(());
139        }
140
141        let mut files = self.files.write().await;
142        files
143            .entry((session_id, path.clone()))
144            .or_insert_with(|| FileEntry {
145                file: SessionFile {
146                    id: Uuid::now_v7(),
147                    session_id: session_id.uuid(),
148                    path: path.clone(),
149                    name: FileInfo::name_from_path(&path),
150                    content: None,
151                    encoding: "text".to_string(),
152                    is_directory: true,
153                    is_readonly: false,
154                    size_bytes: 0,
155                    created_at: Utc::now(),
156                    updated_at: Utc::now(),
157                },
158            });
159        Ok(())
160    }
161
162    async fn upsert_file(
163        &self,
164        session_id: SessionId,
165        path: &str,
166        content: &str,
167        encoding: &str,
168        is_readonly: bool,
169    ) -> Result<SessionFile> {
170        let now = Utc::now();
171        let normalized = normalize_path(path);
172        let mut files = self.files.write().await;
173        let key = (session_id, normalized.clone());
174
175        let file = files
176            .entry(key)
177            .and_modify(|entry| {
178                entry.file.content = Some(content.to_string());
179                entry.file.encoding = encoding.to_string();
180                entry.file.is_directory = false;
181                entry.file.is_readonly = is_readonly;
182                entry.file.size_bytes = content.len() as i64;
183                entry.file.updated_at = now;
184            })
185            .or_insert_with(|| FileEntry {
186                file: SessionFile {
187                    id: Uuid::now_v7(),
188                    session_id: session_id.uuid(),
189                    path: normalized.clone(),
190                    name: FileInfo::name_from_path(&normalized),
191                    content: Some(content.to_string()),
192                    encoding: encoding.to_string(),
193                    is_directory: false,
194                    is_readonly,
195                    size_bytes: content.len() as i64,
196                    created_at: now,
197                    updated_at: now,
198                },
199            })
200            .file
201            .clone();
202
203        Ok(file)
204    }
205
206    /// Read a text file from the workspace, returning `None` when absent.
207    pub async fn read_text(&self, session_id: SessionId, path: &str) -> Option<String> {
208        self.read_file(session_id, path)
209            .await
210            .ok()
211            .flatten()
212            .and_then(|file| file.content)
213    }
214}
215
216#[async_trait]
217impl SessionFileSystem for InMemorySessionFileStore {
218    async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
219        InMemorySessionFileStore::seed_initial_file(self, session_id, file).await
220    }
221
222    async fn read_file(&self, session_id: SessionId, path: &str) -> Result<Option<SessionFile>> {
223        let normalized = normalize_path(path);
224        if normalized == "/" {
225            return Ok(Some(root_directory(session_id)));
226        }
227
228        Ok(self
229            .files
230            .read()
231            .await
232            .get(&(session_id, normalized))
233            .map(|entry| entry.file.clone()))
234    }
235
236    async fn write_file(
237        &self,
238        session_id: SessionId,
239        path: &str,
240        content: &str,
241        encoding: &str,
242    ) -> Result<SessionFile> {
243        let normalized = normalize_path(path);
244        self.ensure_parent_directories(session_id, &normalized)
245            .await?;
246
247        if let Some(existing) = self.read_file(session_id, &normalized).await?
248            && existing.is_readonly
249        {
250            return Err(AgentLoopError::tool(format!(
251                "file is read-only: {}",
252                normalized
253            )));
254        }
255
256        self.upsert_file(session_id, &normalized, content, encoding, false)
257            .await
258    }
259
260    async fn delete_file(
261        &self,
262        session_id: SessionId,
263        path: &str,
264        recursive: bool,
265    ) -> Result<bool> {
266        let normalized = normalize_path(path);
267        if normalized == "/" {
268            return Ok(false);
269        }
270
271        let mut files = self.files.write().await;
272        let key = (session_id, normalized.clone());
273        let Some(existing) = files.get(&key).cloned() else {
274            return Ok(false);
275        };
276
277        if existing.file.is_readonly {
278            return Ok(false);
279        }
280
281        if existing.file.is_directory {
282            let prefix = format!("{normalized}/");
283            let has_children = files
284                .keys()
285                .any(|(sid, candidate)| *sid == session_id && candidate.starts_with(&prefix));
286            if has_children && !recursive {
287                return Ok(false);
288            }
289            files.retain(|(sid, candidate), _| {
290                !(*sid == session_id
291                    && (candidate == &normalized || candidate.starts_with(&prefix)))
292            });
293            return Ok(true);
294        }
295
296        Ok(files.remove(&key).is_some())
297    }
298
299    async fn list_directory(&self, session_id: SessionId, path: &str) -> Result<Vec<FileInfo>> {
300        let normalized = normalize_path(path);
301        if normalized != "/" {
302            let Some(dir) = self.read_file(session_id, &normalized).await? else {
303                return Ok(vec![]);
304            };
305            if !dir.is_directory {
306                return Ok(vec![]);
307            }
308        }
309
310        let files = self.files.read().await;
311        let mut infos = Vec::new();
312        let mut seen = BTreeSet::new();
313
314        for ((sid, candidate), entry) in files.iter() {
315            if *sid != session_id {
316                continue;
317            }
318            if FileInfo::parent_path(candidate).as_deref() != Some(normalized.as_str()) {
319                continue;
320            }
321            if seen.insert(candidate.clone()) {
322                infos.push(file_info(&entry.file));
323            }
324        }
325
326        infos.sort_by(|a, b| a.path.cmp(&b.path));
327        Ok(infos)
328    }
329
330    async fn stat_file(&self, session_id: SessionId, path: &str) -> Result<Option<FileStat>> {
331        let normalized = normalize_path(path);
332        if normalized == "/" {
333            let root = root_directory(session_id);
334            return Ok(Some(FileStat {
335                path: root.path,
336                name: root.name,
337                is_directory: true,
338                is_readonly: false,
339                size_bytes: 0,
340                created_at: root.created_at,
341                updated_at: root.updated_at,
342            }));
343        }
344
345        Ok(self
346            .files
347            .read()
348            .await
349            .get(&(session_id, normalized))
350            .map(|entry| FileStat {
351                path: entry.file.path.clone(),
352                name: entry.file.name.clone(),
353                is_directory: entry.file.is_directory,
354                is_readonly: entry.file.is_readonly,
355                size_bytes: entry.file.size_bytes,
356                created_at: entry.file.created_at,
357                updated_at: entry.file.updated_at,
358            }))
359    }
360
361    async fn grep_files(
362        &self,
363        session_id: SessionId,
364        pattern: &str,
365        path_pattern: Option<&str>,
366    ) -> Result<Vec<GrepMatch>> {
367        let files = self.files.read().await;
368        let mut matches = Vec::new();
369
370        for ((sid, path), entry) in files.iter() {
371            if *sid != session_id || entry.file.is_directory || entry.file.encoding != "text" {
372                continue;
373            }
374            if let Some(path_pattern) = path_pattern
375                && !path.contains(path_pattern)
376            {
377                continue;
378            }
379
380            let Some(content) = &entry.file.content else {
381                continue;
382            };
383
384            for (idx, line) in content.lines().enumerate() {
385                if line.contains(pattern) {
386                    matches.push(GrepMatch {
387                        path: path.clone(),
388                        line_number: idx + 1,
389                        line: line.to_string(),
390                    });
391                }
392            }
393        }
394
395        Ok(matches)
396    }
397
398    async fn create_directory(&self, session_id: SessionId, path: &str) -> Result<FileInfo> {
399        let normalized = normalize_path(path);
400        self.ensure_parent_directories(session_id, &normalized)
401            .await?;
402        self.insert_directory_if_missing(session_id, &normalized)
403            .await?;
404        let file = self
405            .read_file(session_id, &normalized)
406            .await?
407            .ok_or_else(|| AgentLoopError::store(format!("directory not found: {normalized}")))?;
408        Ok(file_info(&file))
409    }
410}
411
412/// In-memory implementation of session key/value and secret storage.
413#[derive(Debug, Default, Clone)]
414pub struct InMemorySessionStorageStore {
415    values: Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
416    secrets: Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
417}
418
419#[derive(Debug, Clone)]
420struct StorageValue {
421    value: String,
422    created_at: DateTime<Utc>,
423    updated_at: DateTime<Utc>,
424}
425
426impl InMemorySessionStorageStore {
427    /// Create an empty in-memory storage store.
428    pub fn new() -> Self {
429        Self {
430            values: Arc::new(RwLock::new(HashMap::new())),
431            secrets: Arc::new(RwLock::new(HashMap::new())),
432        }
433    }
434}
435
436#[async_trait]
437impl SessionStorageStore for InMemorySessionStorageStore {
438    async fn set_value(&self, session_id: SessionId, key: &str, value: &str) -> Result<()> {
439        upsert_storage(&self.values, session_id, key, value).await;
440        Ok(())
441    }
442
443    async fn get_value(&self, session_id: SessionId, key: &str) -> Result<Option<String>> {
444        Ok(self
445            .values
446            .read()
447            .await
448            .get(&(session_id, key.to_string()))
449            .map(|value| value.value.clone()))
450    }
451
452    async fn delete_value(&self, session_id: SessionId, key: &str) -> Result<bool> {
453        Ok(self
454            .values
455            .write()
456            .await
457            .remove(&(session_id, key.to_string()))
458            .is_some())
459    }
460
461    async fn list_keys(&self, session_id: SessionId) -> Result<Vec<KeyInfo>> {
462        Ok(list_storage(&self.values, session_id)
463            .await
464            .into_iter()
465            .map(|(key, value)| KeyInfo {
466                key,
467                created_at: value.created_at,
468                updated_at: value.updated_at,
469            })
470            .collect())
471    }
472
473    async fn set_secret(&self, session_id: SessionId, name: &str, value: &str) -> Result<()> {
474        upsert_storage(&self.secrets, session_id, name, value).await;
475        Ok(())
476    }
477
478    async fn get_secret(&self, session_id: SessionId, name: &str) -> Result<Option<String>> {
479        Ok(self
480            .secrets
481            .read()
482            .await
483            .get(&(session_id, name.to_string()))
484            .map(|value| value.value.clone()))
485    }
486
487    async fn delete_secret(&self, session_id: SessionId, name: &str) -> Result<bool> {
488        Ok(self
489            .secrets
490            .write()
491            .await
492            .remove(&(session_id, name.to_string()))
493            .is_some())
494    }
495
496    async fn list_secrets(&self, session_id: SessionId) -> Result<Vec<SecretInfo>> {
497        Ok(list_storage(&self.secrets, session_id)
498            .await
499            .into_iter()
500            .map(|(name, value)| SecretInfo {
501                name,
502                created_at: value.created_at,
503                updated_at: value.updated_at,
504            })
505            .collect())
506    }
507}
508
509async fn upsert_storage(
510    map: &Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
511    session_id: SessionId,
512    key: &str,
513    value: &str,
514) {
515    let mut map = map.write().await;
516    let now = Utc::now();
517    map.entry((session_id, key.to_string()))
518        .and_modify(|stored| {
519            stored.value = value.to_string();
520            stored.updated_at = now;
521        })
522        .or_insert_with(|| StorageValue {
523            value: value.to_string(),
524            created_at: now,
525            updated_at: now,
526        });
527}
528
529async fn list_storage(
530    map: &Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
531    session_id: SessionId,
532) -> Vec<(String, StorageValue)> {
533    let mut values: Vec<_> = map
534        .read()
535        .await
536        .iter()
537        .filter(|((sid, _), _)| *sid == session_id)
538        .map(|((_, key), value)| (key.clone(), value.clone()))
539        .collect();
540    values.sort_by(|a, b| a.0.cmp(&b.0));
541    values
542}
543
544fn normalize_path(path: &str) -> String {
545    if path == "/" || path.is_empty() {
546        return "/".to_string();
547    }
548    let mut normalized = if let Some(stripped) = path.strip_prefix("/workspace/") {
549        format!("/{}", stripped)
550    } else if path == "/workspace" {
551        "/".to_string()
552    } else if path.starts_with('/') {
553        path.to_string()
554    } else {
555        format!("/{}", path)
556    };
557
558    while normalized.len() > 1 && normalized.ends_with('/') {
559        normalized.pop();
560    }
561    normalized
562}
563
564fn root_directory(session_id: SessionId) -> SessionFile {
565    let now = Utc::now();
566    SessionFile {
567        id: Uuid::nil(),
568        session_id: session_id.uuid(),
569        path: "/".to_string(),
570        name: "/".to_string(),
571        content: None,
572        encoding: "text".to_string(),
573        is_directory: true,
574        is_readonly: false,
575        size_bytes: 0,
576        created_at: now,
577        updated_at: now,
578    }
579}
580
581fn file_info(file: &SessionFile) -> FileInfo {
582    FileInfo {
583        id: file.id,
584        session_id: file.session_id,
585        path: file.path.clone(),
586        name: file.name.clone(),
587        is_directory: file.is_directory,
588        is_readonly: file.is_readonly,
589        size_bytes: file.size_bytes,
590        created_at: file.created_at,
591        updated_at: file.updated_at,
592    }
593}