1use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use everruns_core::error::{AgentLoopError, Result};
8use everruns_core::session::Session;
9use everruns_core::session_file::{FileInfo, FileStat, GrepMatch, InitialFile, SessionFile};
10use everruns_core::traits::{
11 KeyInfo, SecretInfo, SessionFileSystem, SessionFileSystemFactory,
12 SessionFileSystemFactoryContext, SessionMutator, SessionStorageStore, SessionStore,
13};
14use everruns_core::typed_id::SessionId;
15use std::collections::{BTreeSet, HashMap};
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20#[derive(Debug, Default, Clone)]
24pub struct InMemorySessionStore {
25 sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
26}
27
28impl InMemorySessionStore {
29 pub fn new() -> Self {
31 Self {
32 sessions: Arc::new(RwLock::new(HashMap::new())),
33 }
34 }
35
36 pub async fn add_session(&self, session: Session) {
38 self.sessions.write().await.insert(session.id, session);
39 }
40}
41
42#[async_trait]
43impl SessionStore for InMemorySessionStore {
44 async fn get_session(&self, session_id: SessionId) -> Result<Option<Session>> {
45 Ok(self.sessions.read().await.get(&session_id).cloned())
46 }
47}
48
49#[async_trait]
50impl SessionMutator for InMemorySessionStore {
51 async fn update_session_title(&self, session_id: SessionId, title: String) -> Result<Session> {
52 let mut sessions = self.sessions.write().await;
53 let session = sessions
54 .get_mut(&session_id)
55 .ok_or_else(|| AgentLoopError::store(format!("session not found: {session_id}")))?;
56 session.title = Some(title);
57 session.updated_at = Utc::now();
58 Ok(session.clone())
59 }
60}
61
62#[derive(Debug, Clone)]
63struct FileEntry {
64 file: SessionFile,
65}
66
67#[derive(Debug, Default, Clone)]
72pub struct InMemorySessionFileStore {
73 files: Arc<RwLock<HashMap<(SessionId, String), FileEntry>>>,
74}
75
76#[derive(Debug, Clone, Default)]
78pub struct InMemorySessionFileSystemFactory;
79
80#[async_trait]
81impl SessionFileSystemFactory for InMemorySessionFileSystemFactory {
82 fn name(&self) -> &'static str {
83 "InMemorySessionFileSystemFactory"
84 }
85
86 async fn create_session_file_system(
87 &self,
88 _context: SessionFileSystemFactoryContext,
89 ) -> Result<Arc<dyn SessionFileSystem>> {
90 Ok(Arc::new(InMemorySessionFileStore::new()))
91 }
92}
93
94impl InMemorySessionFileStore {
95 pub fn new() -> Self {
97 Self {
98 files: Arc::new(RwLock::new(HashMap::new())),
99 }
100 }
101
102 pub async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
104 let path = normalize_path(&file.path);
105 self.ensure_parent_directories(session_id, &path).await?;
106 self.upsert_file(
107 session_id,
108 &path,
109 &file.content,
110 &file.encoding,
111 file.is_readonly,
112 )
113 .await
114 .map(|_| ())
115 }
116
117 async fn ensure_parent_directories(&self, session_id: SessionId, path: &str) -> Result<()> {
118 let mut current = String::new();
119 for segment in path.trim_start_matches('/').split('/').collect::<Vec<_>>() {
120 if segment.is_empty() {
121 continue;
122 }
123 current.push('/');
124 current.push_str(segment);
125 let is_leaf = current == path;
126 if is_leaf {
127 break;
128 }
129 self.insert_directory_if_missing(session_id, ¤t)
130 .await?;
131 }
132 Ok(())
133 }
134
135 async fn insert_directory_if_missing(&self, session_id: SessionId, path: &str) -> Result<()> {
136 let path = normalize_path(path);
137 if path == "/" {
138 return Ok(());
139 }
140
141 let mut files = self.files.write().await;
142 files
143 .entry((session_id, path.clone()))
144 .or_insert_with(|| FileEntry {
145 file: SessionFile {
146 id: Uuid::now_v7(),
147 session_id: session_id.uuid(),
148 path: path.clone(),
149 name: FileInfo::name_from_path(&path),
150 content: None,
151 encoding: "text".to_string(),
152 is_directory: true,
153 is_readonly: false,
154 size_bytes: 0,
155 created_at: Utc::now(),
156 updated_at: Utc::now(),
157 },
158 });
159 Ok(())
160 }
161
162 async fn upsert_file(
163 &self,
164 session_id: SessionId,
165 path: &str,
166 content: &str,
167 encoding: &str,
168 is_readonly: bool,
169 ) -> Result<SessionFile> {
170 let now = Utc::now();
171 let normalized = normalize_path(path);
172 let mut files = self.files.write().await;
173 let key = (session_id, normalized.clone());
174
175 let file = files
176 .entry(key)
177 .and_modify(|entry| {
178 entry.file.content = Some(content.to_string());
179 entry.file.encoding = encoding.to_string();
180 entry.file.is_directory = false;
181 entry.file.is_readonly = is_readonly;
182 entry.file.size_bytes = content.len() as i64;
183 entry.file.updated_at = now;
184 })
185 .or_insert_with(|| FileEntry {
186 file: SessionFile {
187 id: Uuid::now_v7(),
188 session_id: session_id.uuid(),
189 path: normalized.clone(),
190 name: FileInfo::name_from_path(&normalized),
191 content: Some(content.to_string()),
192 encoding: encoding.to_string(),
193 is_directory: false,
194 is_readonly,
195 size_bytes: content.len() as i64,
196 created_at: now,
197 updated_at: now,
198 },
199 })
200 .file
201 .clone();
202
203 Ok(file)
204 }
205
206 pub async fn read_text(&self, session_id: SessionId, path: &str) -> Option<String> {
208 self.read_file(session_id, path)
209 .await
210 .ok()
211 .flatten()
212 .and_then(|file| file.content)
213 }
214}
215
216#[async_trait]
217impl SessionFileSystem for InMemorySessionFileStore {
218 async fn seed_initial_file(&self, session_id: SessionId, file: &InitialFile) -> Result<()> {
219 InMemorySessionFileStore::seed_initial_file(self, session_id, file).await
220 }
221
222 async fn read_file(&self, session_id: SessionId, path: &str) -> Result<Option<SessionFile>> {
223 let normalized = normalize_path(path);
224 if normalized == "/" {
225 return Ok(Some(root_directory(session_id)));
226 }
227
228 Ok(self
229 .files
230 .read()
231 .await
232 .get(&(session_id, normalized))
233 .map(|entry| entry.file.clone()))
234 }
235
236 async fn write_file(
237 &self,
238 session_id: SessionId,
239 path: &str,
240 content: &str,
241 encoding: &str,
242 ) -> Result<SessionFile> {
243 let normalized = normalize_path(path);
244 self.ensure_parent_directories(session_id, &normalized)
245 .await?;
246
247 if let Some(existing) = self.read_file(session_id, &normalized).await?
248 && existing.is_readonly
249 {
250 return Err(AgentLoopError::tool(format!(
251 "file is read-only: {}",
252 normalized
253 )));
254 }
255
256 self.upsert_file(session_id, &normalized, content, encoding, false)
257 .await
258 }
259
260 async fn delete_file(
261 &self,
262 session_id: SessionId,
263 path: &str,
264 recursive: bool,
265 ) -> Result<bool> {
266 let normalized = normalize_path(path);
267 if normalized == "/" {
268 return Ok(false);
269 }
270
271 let mut files = self.files.write().await;
272 let key = (session_id, normalized.clone());
273 let Some(existing) = files.get(&key).cloned() else {
274 return Ok(false);
275 };
276
277 if existing.file.is_readonly {
278 return Ok(false);
279 }
280
281 if existing.file.is_directory {
282 let prefix = format!("{normalized}/");
283 let has_children = files
284 .keys()
285 .any(|(sid, candidate)| *sid == session_id && candidate.starts_with(&prefix));
286 if has_children && !recursive {
287 return Ok(false);
288 }
289 files.retain(|(sid, candidate), _| {
290 !(*sid == session_id
291 && (candidate == &normalized || candidate.starts_with(&prefix)))
292 });
293 return Ok(true);
294 }
295
296 Ok(files.remove(&key).is_some())
297 }
298
299 async fn list_directory(&self, session_id: SessionId, path: &str) -> Result<Vec<FileInfo>> {
300 let normalized = normalize_path(path);
301 if normalized != "/" {
302 let Some(dir) = self.read_file(session_id, &normalized).await? else {
303 return Ok(vec![]);
304 };
305 if !dir.is_directory {
306 return Ok(vec![]);
307 }
308 }
309
310 let files = self.files.read().await;
311 let mut infos = Vec::new();
312 let mut seen = BTreeSet::new();
313
314 for ((sid, candidate), entry) in files.iter() {
315 if *sid != session_id {
316 continue;
317 }
318 if FileInfo::parent_path(candidate).as_deref() != Some(normalized.as_str()) {
319 continue;
320 }
321 if seen.insert(candidate.clone()) {
322 infos.push(file_info(&entry.file));
323 }
324 }
325
326 infos.sort_by(|a, b| a.path.cmp(&b.path));
327 Ok(infos)
328 }
329
330 async fn stat_file(&self, session_id: SessionId, path: &str) -> Result<Option<FileStat>> {
331 let normalized = normalize_path(path);
332 if normalized == "/" {
333 let root = root_directory(session_id);
334 return Ok(Some(FileStat {
335 path: root.path,
336 name: root.name,
337 is_directory: true,
338 is_readonly: false,
339 size_bytes: 0,
340 created_at: root.created_at,
341 updated_at: root.updated_at,
342 }));
343 }
344
345 Ok(self
346 .files
347 .read()
348 .await
349 .get(&(session_id, normalized))
350 .map(|entry| FileStat {
351 path: entry.file.path.clone(),
352 name: entry.file.name.clone(),
353 is_directory: entry.file.is_directory,
354 is_readonly: entry.file.is_readonly,
355 size_bytes: entry.file.size_bytes,
356 created_at: entry.file.created_at,
357 updated_at: entry.file.updated_at,
358 }))
359 }
360
361 async fn grep_files(
362 &self,
363 session_id: SessionId,
364 pattern: &str,
365 path_pattern: Option<&str>,
366 ) -> Result<Vec<GrepMatch>> {
367 let files = self.files.read().await;
368 let mut matches = Vec::new();
369
370 for ((sid, path), entry) in files.iter() {
371 if *sid != session_id || entry.file.is_directory || entry.file.encoding != "text" {
372 continue;
373 }
374 if let Some(path_pattern) = path_pattern
375 && !path.contains(path_pattern)
376 {
377 continue;
378 }
379
380 let Some(content) = &entry.file.content else {
381 continue;
382 };
383
384 for (idx, line) in content.lines().enumerate() {
385 if line.contains(pattern) {
386 matches.push(GrepMatch {
387 path: path.clone(),
388 line_number: idx + 1,
389 line: line.to_string(),
390 });
391 }
392 }
393 }
394
395 Ok(matches)
396 }
397
398 async fn create_directory(&self, session_id: SessionId, path: &str) -> Result<FileInfo> {
399 let normalized = normalize_path(path);
400 self.ensure_parent_directories(session_id, &normalized)
401 .await?;
402 self.insert_directory_if_missing(session_id, &normalized)
403 .await?;
404 let file = self
405 .read_file(session_id, &normalized)
406 .await?
407 .ok_or_else(|| AgentLoopError::store(format!("directory not found: {normalized}")))?;
408 Ok(file_info(&file))
409 }
410}
411
412#[derive(Debug, Default, Clone)]
414pub struct InMemorySessionStorageStore {
415 values: Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
416 secrets: Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
417}
418
419#[derive(Debug, Clone)]
420struct StorageValue {
421 value: String,
422 created_at: DateTime<Utc>,
423 updated_at: DateTime<Utc>,
424}
425
426impl InMemorySessionStorageStore {
427 pub fn new() -> Self {
429 Self {
430 values: Arc::new(RwLock::new(HashMap::new())),
431 secrets: Arc::new(RwLock::new(HashMap::new())),
432 }
433 }
434}
435
436#[async_trait]
437impl SessionStorageStore for InMemorySessionStorageStore {
438 async fn set_value(&self, session_id: SessionId, key: &str, value: &str) -> Result<()> {
439 upsert_storage(&self.values, session_id, key, value).await;
440 Ok(())
441 }
442
443 async fn get_value(&self, session_id: SessionId, key: &str) -> Result<Option<String>> {
444 Ok(self
445 .values
446 .read()
447 .await
448 .get(&(session_id, key.to_string()))
449 .map(|value| value.value.clone()))
450 }
451
452 async fn delete_value(&self, session_id: SessionId, key: &str) -> Result<bool> {
453 Ok(self
454 .values
455 .write()
456 .await
457 .remove(&(session_id, key.to_string()))
458 .is_some())
459 }
460
461 async fn list_keys(&self, session_id: SessionId) -> Result<Vec<KeyInfo>> {
462 Ok(list_storage(&self.values, session_id)
463 .await
464 .into_iter()
465 .map(|(key, value)| KeyInfo {
466 key,
467 created_at: value.created_at,
468 updated_at: value.updated_at,
469 })
470 .collect())
471 }
472
473 async fn set_secret(&self, session_id: SessionId, name: &str, value: &str) -> Result<()> {
474 upsert_storage(&self.secrets, session_id, name, value).await;
475 Ok(())
476 }
477
478 async fn get_secret(&self, session_id: SessionId, name: &str) -> Result<Option<String>> {
479 Ok(self
480 .secrets
481 .read()
482 .await
483 .get(&(session_id, name.to_string()))
484 .map(|value| value.value.clone()))
485 }
486
487 async fn delete_secret(&self, session_id: SessionId, name: &str) -> Result<bool> {
488 Ok(self
489 .secrets
490 .write()
491 .await
492 .remove(&(session_id, name.to_string()))
493 .is_some())
494 }
495
496 async fn list_secrets(&self, session_id: SessionId) -> Result<Vec<SecretInfo>> {
497 Ok(list_storage(&self.secrets, session_id)
498 .await
499 .into_iter()
500 .map(|(name, value)| SecretInfo {
501 name,
502 created_at: value.created_at,
503 updated_at: value.updated_at,
504 })
505 .collect())
506 }
507}
508
509async fn upsert_storage(
510 map: &Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
511 session_id: SessionId,
512 key: &str,
513 value: &str,
514) {
515 let mut map = map.write().await;
516 let now = Utc::now();
517 map.entry((session_id, key.to_string()))
518 .and_modify(|stored| {
519 stored.value = value.to_string();
520 stored.updated_at = now;
521 })
522 .or_insert_with(|| StorageValue {
523 value: value.to_string(),
524 created_at: now,
525 updated_at: now,
526 });
527}
528
529async fn list_storage(
530 map: &Arc<RwLock<HashMap<(SessionId, String), StorageValue>>>,
531 session_id: SessionId,
532) -> Vec<(String, StorageValue)> {
533 let mut values: Vec<_> = map
534 .read()
535 .await
536 .iter()
537 .filter(|((sid, _), _)| *sid == session_id)
538 .map(|((_, key), value)| (key.clone(), value.clone()))
539 .collect();
540 values.sort_by(|a, b| a.0.cmp(&b.0));
541 values
542}
543
544fn normalize_path(path: &str) -> String {
545 if path == "/" || path.is_empty() {
546 return "/".to_string();
547 }
548 let mut normalized = if let Some(stripped) = path.strip_prefix("/workspace/") {
549 format!("/{}", stripped)
550 } else if path == "/workspace" {
551 "/".to_string()
552 } else if path.starts_with('/') {
553 path.to_string()
554 } else {
555 format!("/{}", path)
556 };
557
558 while normalized.len() > 1 && normalized.ends_with('/') {
559 normalized.pop();
560 }
561 normalized
562}
563
564fn root_directory(session_id: SessionId) -> SessionFile {
565 let now = Utc::now();
566 SessionFile {
567 id: Uuid::nil(),
568 session_id: session_id.uuid(),
569 path: "/".to_string(),
570 name: "/".to_string(),
571 content: None,
572 encoding: "text".to_string(),
573 is_directory: true,
574 is_readonly: false,
575 size_bytes: 0,
576 created_at: now,
577 updated_at: now,
578 }
579}
580
581fn file_info(file: &SessionFile) -> FileInfo {
582 FileInfo {
583 id: file.id,
584 session_id: file.session_id,
585 path: file.path.clone(),
586 name: file.name.clone(),
587 is_directory: file.is_directory,
588 is_readonly: file.is_readonly,
589 size_bytes: file.size_bytes,
590 created_at: file.created_at,
591 updated_at: file.updated_at,
592 }
593}