1use std::fs::{self, File, OpenOptions};
20use std::io::{BufRead, BufReader, Write};
21use std::path::{Path, PathBuf};
22
23use tempfile::NamedTempFile;
24
25use crate::error::{ServerError, ServerResult};
26use crate::threads::types::{
27 Run, RunError, RunStatus, RunStep, RunStepStatus, Thread, ThreadMessage,
28};
29
30pub struct ThreadStore {
35 root_dir: PathBuf,
37}
38
39impl ThreadStore {
40 pub fn new(dir: PathBuf) -> ServerResult<Self> {
44 fs::create_dir_all(&dir).map_err(|e| ServerError::IoError {
45 context: format!("create thread store root {}", dir.display()),
46 source: e,
47 })?;
48 Ok(Self { root_dir: dir })
49 }
50
51 pub fn create_thread(&self, thread: &Thread) -> ServerResult<()> {
58 let dir = self.thread_dir(&thread.id);
59 fs::create_dir_all(&dir).map_err(|e| ServerError::IoError {
60 context: format!("create thread directory {}", dir.display()),
61 source: e,
62 })?;
63 self.write_json_atomic(&dir, "meta.json", thread)?;
64 Ok(())
65 }
66
67 pub fn get_thread(&self, id: &str) -> ServerResult<Thread> {
72 let path = self.thread_dir(id).join("meta.json");
73 let content =
74 fs::read_to_string(&path).map_err(|_| ServerError::ThreadNotFound(id.to_string()))?;
75 serde_json::from_str(&content).map_err(ServerError::Serialization)
76 }
77
78 pub fn list_thread_ids(&self) -> ServerResult<Vec<String>> {
80 let mut ids = Vec::new();
81 for entry in fs::read_dir(&self.root_dir).map_err(|e| ServerError::IoError {
82 context: "list thread IDs".to_string(),
83 source: e,
84 })? {
85 let entry = entry.map_err(|e| ServerError::IoError {
86 context: "read directory entry".to_string(),
87 source: e,
88 })?;
89 if entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
90 if let Some(name) = entry.file_name().to_str() {
91 ids.push(name.to_string());
92 }
93 }
94 }
95 Ok(ids)
96 }
97
98 pub fn append_message(&self, thread_id: &str, msg: &ThreadMessage) -> ServerResult<()> {
107 let dir = self.thread_dir(thread_id);
108 if !dir.join("meta.json").exists() {
110 return Err(ServerError::ThreadNotFound(thread_id.to_string()));
111 }
112 let path = dir.join("messages.jsonl");
113 let json_line = serde_json::to_string(msg).map_err(ServerError::Serialization)?;
114 let mut file = OpenOptions::new()
115 .create(true)
116 .append(true)
117 .open(&path)
118 .map_err(|e| ServerError::IoError {
119 context: format!("open messages.jsonl for thread {thread_id}"),
120 source: e,
121 })?;
122 writeln!(file, "{}", json_line).map_err(|e| ServerError::IoError {
123 context: format!("write message to thread {thread_id}"),
124 source: e,
125 })?;
126 Ok(())
127 }
128
129 pub fn list_messages(&self, thread_id: &str) -> ServerResult<Vec<ThreadMessage>> {
135 let path = self.thread_dir(thread_id).join("messages.jsonl");
136 if !self.thread_dir(thread_id).join("meta.json").exists() {
137 return Err(ServerError::ThreadNotFound(thread_id.to_string()));
138 }
139 if !path.exists() {
140 return Ok(Vec::new());
141 }
142 let file = File::open(&path).map_err(|e| ServerError::IoError {
143 context: format!("open messages.jsonl for thread {thread_id}"),
144 source: e,
145 })?;
146 let reader = BufReader::new(file);
147 let mut messages = Vec::new();
148 for line_result in reader.lines() {
149 let line = line_result.map_err(|e| ServerError::IoError {
150 context: format!("read messages.jsonl for thread {thread_id}"),
151 source: e,
152 })?;
153 let trimmed = line.trim();
154 if trimmed.is_empty() {
155 continue;
156 }
157 if let Ok(msg) = serde_json::from_str::<ThreadMessage>(trimmed) {
158 messages.push(msg);
159 }
160 }
162 Ok(messages)
163 }
164
165 pub fn create_run(&self, thread_id: &str, run: &Run) -> ServerResult<()> {
172 let thread_dir = self.thread_dir(thread_id);
173 if !thread_dir.join("meta.json").exists() {
174 return Err(ServerError::ThreadNotFound(thread_id.to_string()));
175 }
176 let run_dir = self.run_dir(thread_id, &run.id);
177 fs::create_dir_all(&run_dir).map_err(|e| ServerError::IoError {
178 context: format!("create run directory {}", run_dir.display()),
179 source: e,
180 })?;
181 self.write_json_atomic(&run_dir, "status.json", run)?;
182 Ok(())
183 }
184
185 pub fn get_run(&self, thread_id: &str, run_id: &str) -> ServerResult<Run> {
189 let path = self.run_dir(thread_id, run_id).join("status.json");
190 let content =
191 fs::read_to_string(&path).map_err(|_| ServerError::RunNotFound(run_id.to_string()))?;
192 serde_json::from_str(&content).map_err(ServerError::Serialization)
193 }
194
195 pub fn update_run_status(
200 &self,
201 thread_id: &str,
202 run_id: &str,
203 status: RunStatus,
204 error: Option<RunError>,
205 ) -> ServerResult<()> {
206 let mut run = self.get_run(thread_id, run_id)?;
207
208 if run.status.is_terminal() {
209 return Err(ServerError::RunInTerminalState(format!(
210 "{} is already in terminal state {:?}",
211 run_id, run.status
212 )));
213 }
214
215 run.status = status;
216 run.last_error = error;
217 let run_dir = self.run_dir(thread_id, run_id);
218 self.write_json_atomic(&run_dir, "status.json", &run)?;
219 Ok(())
220 }
221
222 pub fn force_update_run_status(
227 &self,
228 thread_id: &str,
229 run_id: &str,
230 status: RunStatus,
231 error: Option<RunError>,
232 ) -> ServerResult<()> {
233 let mut run = self.get_run(thread_id, run_id)?;
234 run.status = status;
235 run.last_error = error;
236 let run_dir = self.run_dir(thread_id, run_id);
237 self.write_json_atomic(&run_dir, "status.json", &run)?;
238 Ok(())
239 }
240
241 pub fn steps_dir(&self, thread_id: &str, run_id: &str) -> PathBuf {
245 self.run_dir(thread_id, run_id).join("steps")
246 }
247
248 pub fn append_step(&self, thread_id: &str, run_id: &str, step: &RunStep) -> ServerResult<()> {
253 let run_dir = self.run_dir(thread_id, run_id);
254 if !run_dir.join("status.json").exists() {
255 return Err(ServerError::RunNotFound(run_id.to_string()));
256 }
257 let steps_dir = self.steps_dir(thread_id, run_id);
258 fs::create_dir_all(&steps_dir).map_err(|e| ServerError::IoError {
259 context: format!("create steps directory {}", steps_dir.display()),
260 source: e,
261 })?;
262 let filename = format!("{}.json", step.id);
263 self.write_json_atomic(&steps_dir, &filename, step)?;
264 Ok(())
265 }
266
267 pub fn list_steps(&self, thread_id: &str, run_id: &str) -> ServerResult<Vec<RunStep>> {
271 let run_dir = self.run_dir(thread_id, run_id);
272 if !run_dir.join("status.json").exists() {
273 return Err(ServerError::RunNotFound(run_id.to_string()));
274 }
275 let steps_dir = self.steps_dir(thread_id, run_id);
276 if !steps_dir.exists() {
277 return Ok(Vec::new());
278 }
279 let mut steps = Vec::new();
280 for entry in fs::read_dir(&steps_dir).map_err(|e| ServerError::IoError {
281 context: format!("read steps dir {}", steps_dir.display()),
282 source: e,
283 })? {
284 let entry = entry.map_err(|e| ServerError::IoError {
285 context: "read steps entry".to_string(),
286 source: e,
287 })?;
288 let path = entry.path();
289 if path.extension().and_then(|e| e.to_str()) != Some("json") {
290 continue;
291 }
292 if let Ok(content) = fs::read_to_string(&path) {
293 if let Ok(step) = serde_json::from_str::<RunStep>(&content) {
294 steps.push(step);
295 }
296 }
297 }
298 steps.sort_by_key(|s| s.created_at);
299 Ok(steps)
300 }
301
302 pub fn get_step(&self, thread_id: &str, run_id: &str, step_id: &str) -> ServerResult<RunStep> {
306 let steps_dir = self.steps_dir(thread_id, run_id);
307 let path = steps_dir.join(format!("{step_id}.json"));
308 let content = fs::read_to_string(&path)
309 .map_err(|_| ServerError::RunStepNotFound(step_id.to_string()))?;
310 serde_json::from_str(&content).map_err(ServerError::Serialization)
311 }
312
313 pub fn update_step_status(
317 &self,
318 thread_id: &str,
319 run_id: &str,
320 step_id: &str,
321 status: RunStepStatus,
322 ) -> ServerResult<()> {
323 let mut step = self.get_step(thread_id, run_id, step_id)?;
324 let now_u64 = std::time::SystemTime::now()
325 .duration_since(std::time::UNIX_EPOCH)
326 .map(|d| d.as_secs())
327 .unwrap_or(0);
328 match &status {
329 RunStepStatus::Completed => step.completed_at = Some(now_u64),
330 RunStepStatus::Failed => step.failed_at = Some(now_u64),
331 _ => {}
332 }
333 step.status = status;
334 let steps_dir = self.steps_dir(thread_id, run_id);
335 let filename = format!("{step_id}.json");
336 self.write_json_atomic(&steps_dir, &filename, &step)?;
337 Ok(())
338 }
339
340 pub fn thread_dir(&self, thread_id: &str) -> PathBuf {
344 self.root_dir.join(thread_id)
345 }
346
347 pub fn run_dir(&self, thread_id: &str, run_id: &str) -> PathBuf {
349 self.thread_dir(thread_id).join("runs").join(run_id)
350 }
351
352 fn write_json_atomic<T: serde::Serialize>(
359 &self,
360 dir: &Path,
361 filename: &str,
362 value: &T,
363 ) -> ServerResult<()> {
364 let json = serde_json::to_string_pretty(value).map_err(ServerError::Serialization)?;
365 let mut tmp = NamedTempFile::new_in(dir).map_err(|e| ServerError::IoError {
366 context: format!("create temp file in {}", dir.display()),
367 source: e,
368 })?;
369 tmp.write_all(json.as_bytes())
370 .map_err(|e| ServerError::IoError {
371 context: "write to temp file".to_string(),
372 source: e,
373 })?;
374 tmp.flush().map_err(|e| ServerError::IoError {
375 context: "flush temp file".to_string(),
376 source: e,
377 })?;
378 let target = dir.join(filename);
379 tmp.persist(&target).map_err(|e| ServerError::IoError {
380 context: format!("persist atomic write to {}", target.display()),
381 source: e.error,
382 })?;
383 Ok(())
384 }
385}
386
387#[cfg(test)]
390mod tests {
391 use super::*;
392 use crate::threads::types::{
393 Run, RunStatus, RunStep, RunStepStatus, RunStepType, Thread, ThreadMessage,
394 };
395 use std::env::temp_dir;
396 use uuid::Uuid;
397
398 fn make_store(tag: &str) -> ThreadStore {
399 let id = Uuid::new_v4().as_simple().to_string();
400 let dir = temp_dir().join(format!("oxillama_thread_store_test_{tag}_{id}"));
401 ThreadStore::new(dir).expect("ThreadStore::new should succeed")
402 }
403
404 fn make_thread(id: &str) -> Thread {
405 Thread {
406 id: id.to_string(),
407 object: "thread".to_string(),
408 created_at: 1_000_000,
409 metadata: serde_json::json!({}),
410 }
411 }
412
413 fn make_run(id: &str, thread_id: &str) -> Run {
414 Run {
415 id: id.to_string(),
416 object: "thread.run".to_string(),
417 created_at: 1_000_001,
418 thread_id: thread_id.to_string(),
419 status: RunStatus::Queued,
420 model: "test-model".to_string(),
421 last_error: None,
422 }
423 }
424
425 #[test]
426 fn store_creates_root_directory() {
427 let id = Uuid::new_v4().as_simple().to_string();
428 let dir = temp_dir().join(format!("oxillama_thread_store_create_{id}"));
429 let _ = fs::remove_dir_all(&dir);
430 ThreadStore::new(dir.clone()).expect("should create store");
431 assert!(dir.exists());
432 }
433
434 #[test]
435 fn create_and_get_thread() {
436 let store = make_store("get_thread");
437 let thread = make_thread("thread_aaa");
438 store.create_thread(&thread).expect("create_thread");
439 let got = store.get_thread("thread_aaa").expect("get_thread");
440 assert_eq!(got.id, "thread_aaa");
441 }
442
443 #[test]
444 fn get_thread_not_found_returns_error() {
445 let store = make_store("thread_notfound");
446 let err = store.get_thread("nonexistent").expect_err("should fail");
447 assert!(matches!(err, ServerError::ThreadNotFound(_)));
448 }
449
450 #[test]
451 fn append_and_list_messages_in_order() {
452 let store = make_store("messages_order");
453 let thread = make_thread("thread_msgs");
454 store.create_thread(&thread).expect("create_thread");
455
456 for i in 0..5_u32 {
457 let msg = ThreadMessage::new_user(
458 format!("msg_{i}"),
459 "thread_msgs".to_string(),
460 format!("hello {i}"),
461 );
462 store.append_message("thread_msgs", &msg).expect("append");
463 }
464
465 let msgs = store.list_messages("thread_msgs").expect("list");
466 assert_eq!(msgs.len(), 5);
467 for (i, m) in msgs.iter().enumerate() {
468 assert_eq!(m.text_content(), format!("hello {i}"));
469 }
470 }
471
472 #[test]
473 fn append_message_unknown_thread_errors() {
474 let store = make_store("append_no_thread");
475 let msg = ThreadMessage::new_user("msg_x".into(), "ghost".into(), "hi".into());
476 let err = store
477 .append_message("ghost", &msg)
478 .expect_err("should fail");
479 assert!(matches!(err, ServerError::ThreadNotFound(_)));
480 }
481
482 #[test]
483 fn create_and_get_run() {
484 let store = make_store("get_run");
485 let thread = make_thread("thread_run");
486 store.create_thread(&thread).expect("create");
487 let run = make_run("run_001", "thread_run");
488 store.create_run("thread_run", &run).expect("create_run");
489 let got = store.get_run("thread_run", "run_001").expect("get_run");
490 assert_eq!(got.id, "run_001");
491 assert_eq!(got.status, RunStatus::Queued);
492 }
493
494 #[test]
495 fn update_run_status_transitions() {
496 let store = make_store("run_status");
497 let thread = make_thread("thread_rs");
498 store.create_thread(&thread).expect("create");
499 let run = make_run("run_002", "thread_rs");
500 store.create_run("thread_rs", &run).expect("create_run");
501
502 store
503 .update_run_status("thread_rs", "run_002", RunStatus::InProgress, None)
504 .expect("to in-progress");
505
506 let got = store.get_run("thread_rs", "run_002").expect("get");
507 assert_eq!(got.status, RunStatus::InProgress);
508
509 store
510 .update_run_status("thread_rs", "run_002", RunStatus::Completed, None)
511 .expect("to completed");
512
513 let final_run = store.get_run("thread_rs", "run_002").expect("get final");
514 assert_eq!(final_run.status, RunStatus::Completed);
515 }
516
517 #[test]
518 fn update_terminal_run_returns_error() {
519 let store = make_store("run_terminal");
520 let thread = make_thread("thread_term");
521 store.create_thread(&thread).expect("create");
522 let run = make_run("run_003", "thread_term");
523 store.create_run("thread_term", &run).expect("create_run");
524 store
525 .update_run_status("thread_term", "run_003", RunStatus::Completed, None)
526 .expect("complete");
527 let err = store
528 .update_run_status("thread_term", "run_003", RunStatus::InProgress, None)
529 .expect_err("should reject terminal");
530 assert!(matches!(err, ServerError::RunInTerminalState(_)));
531 }
532
533 #[test]
534 fn get_run_not_found() {
535 let store = make_store("run_notfound");
536 let thread = make_thread("thread_nrf");
537 store.create_thread(&thread).expect("create");
538 let err = store
539 .get_run("thread_nrf", "ghost_run")
540 .expect_err("should fail");
541 assert!(matches!(err, ServerError::RunNotFound(_)));
542 }
543
544 #[test]
545 fn persistence_across_store_drop_and_recreate() {
546 let id = Uuid::new_v4().as_simple().to_string();
547 let dir = temp_dir().join(format!("oxillama_thread_persistence_{id}"));
548 let thread = make_thread("thread_persist");
549
550 {
551 let store = ThreadStore::new(dir.clone()).expect("create store");
552 store.create_thread(&thread).expect("create thread");
553 let msg =
554 ThreadMessage::new_user("msg_p1".into(), "thread_persist".into(), "data".into());
555 store
556 .append_message("thread_persist", &msg)
557 .expect("append");
558 }
559
560 let store2 = ThreadStore::new(dir).expect("reopen store");
562 let got = store2
563 .get_thread("thread_persist")
564 .expect("read after restart");
565 assert_eq!(got.id, "thread_persist");
566 let msgs = store2.list_messages("thread_persist").expect("messages");
567 assert_eq!(msgs.len(), 1);
568 assert_eq!(msgs[0].text_content(), "data");
569 }
570
571 #[test]
572 fn list_messages_empty_if_no_messages_yet() {
573 let store = make_store("empty_msgs");
574 let thread = make_thread("thread_empty");
575 store.create_thread(&thread).expect("create");
576 let msgs = store.list_messages("thread_empty").expect("list");
577 assert!(msgs.is_empty());
578 }
579
580 #[test]
581 fn atomic_write_leaves_no_partial_state() {
582 let store = make_store("atomic");
583 let thread = make_thread("thread_atomic");
584 store.create_thread(&thread).expect("create");
585 let run = make_run("run_atomic", "thread_atomic");
586 store.create_run("thread_atomic", &run).expect("create run");
587
588 for i in 0..20 {
590 let target_status = if i % 2 == 0 {
591 RunStatus::InProgress
592 } else {
593 RunStatus::Queued
594 };
595 store
597 .force_update_run_status("thread_atomic", "run_atomic", target_status, None)
598 .expect("force update");
599 let got = store
600 .get_run("thread_atomic", "run_atomic")
601 .expect("read mid-loop");
602 let _ = serde_json::to_string(&got.status).expect("serialize");
604 }
605 }
606
607 fn make_step(step_id: &str, run_id: &str, thread_id: &str) -> RunStep {
608 RunStep {
609 id: step_id.to_string(),
610 object: "thread.run.step".to_string(),
611 run_id: run_id.to_string(),
612 thread_id: thread_id.to_string(),
613 step_type: RunStepType::MessageCreation,
614 status: RunStepStatus::InProgress,
615 created_at: 1_000_002,
616 completed_at: None,
617 failed_at: None,
618 error: None,
619 step_details: None,
620 }
621 }
622
623 #[test]
624 fn step_list_returns_all_steps() {
625 let store = make_store("step_list");
626 let thread = make_thread("thread_sl");
627 store.create_thread(&thread).expect("create thread");
628 let run = make_run("run_sl", "thread_sl");
629 store.create_run("thread_sl", &run).expect("create run");
630
631 for i in 0..3_u32 {
632 let step = make_step(&format!("step_{i}"), "run_sl", "thread_sl");
633 store
634 .append_step("thread_sl", "run_sl", &step)
635 .expect("append step");
636 }
637
638 let steps = store.list_steps("thread_sl", "run_sl").expect("list steps");
639 assert_eq!(steps.len(), 3);
640 }
641
642 #[test]
643 fn step_get_returns_correct_step() {
644 let store = make_store("step_get");
645 let thread = make_thread("thread_sg");
646 store.create_thread(&thread).expect("create thread");
647 let run = make_run("run_sg", "thread_sg");
648 store.create_run("thread_sg", &run).expect("create run");
649
650 let step = make_step("step_target", "run_sg", "thread_sg");
651 store
652 .append_step("thread_sg", "run_sg", &step)
653 .expect("append");
654
655 let got = store
656 .get_step("thread_sg", "run_sg", "step_target")
657 .expect("get step");
658 assert_eq!(got.id, "step_target");
659 assert_eq!(got.step_type, RunStepType::MessageCreation);
660 assert_eq!(got.status, RunStepStatus::InProgress);
661 }
662
663 #[test]
664 fn step_not_found_returns_error() {
665 let store = make_store("step_notfound");
666 let thread = make_thread("thread_snf");
667 store.create_thread(&thread).expect("create thread");
668 let run = make_run("run_snf", "thread_snf");
669 store.create_run("thread_snf", &run).expect("create run");
670
671 let err = store
672 .get_step("thread_snf", "run_snf", "step_ghost")
673 .expect_err("should fail");
674 assert!(matches!(err, ServerError::RunStepNotFound(_)));
675 }
676
677 #[test]
678 fn step_update_status_to_completed() {
679 let store = make_store("step_complete");
680 let thread = make_thread("thread_sc");
681 store.create_thread(&thread).expect("create thread");
682 let run = make_run("run_sc", "thread_sc");
683 store.create_run("thread_sc", &run).expect("create run");
684
685 let step = make_step("step_comp", "run_sc", "thread_sc");
686 store
687 .append_step("thread_sc", "run_sc", &step)
688 .expect("append");
689
690 store
691 .update_step_status("thread_sc", "run_sc", "step_comp", RunStepStatus::Completed)
692 .expect("update status");
693
694 let got = store
695 .get_step("thread_sc", "run_sc", "step_comp")
696 .expect("get");
697 assert_eq!(got.status, RunStepStatus::Completed);
698 assert!(got.completed_at.is_some());
699 }
700
701 #[test]
702 fn force_update_run_status_bypasses_terminal_guard() {
703 let store = make_store("force_cancel");
704 let thread = make_thread("thread_fc");
705 store.create_thread(&thread).expect("create");
706 let run = make_run("run_fc", "thread_fc");
707 store.create_run("thread_fc", &run).expect("create run");
708 store
709 .force_update_run_status("thread_fc", "run_fc", RunStatus::Cancelled, None)
710 .expect("cancel");
711 let got = store.get_run("thread_fc", "run_fc").expect("read");
712 assert_eq!(got.status, RunStatus::Cancelled);
713 store
715 .force_update_run_status(
716 "thread_fc",
717 "run_fc",
718 RunStatus::Expired,
719 Some(RunError {
720 code: "expired".into(),
721 message: "timed out".into(),
722 }),
723 )
724 .expect("second force");
725 let final_run = store.get_run("thread_fc", "run_fc").expect("read final");
726 assert_eq!(final_run.status, RunStatus::Expired);
727 assert!(final_run.last_error.is_some());
728 }
729}