1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18 Full,
19 Partial,
20 MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26 Wrapper,
27 ProviderFile,
28 ProviderLog,
29 Stdout,
30 Stderr,
31 Backfill,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42 Shell,
44 FileRead,
46 FileWrite,
48 FileEdit,
50 Search,
52 SubAgent,
54 Web,
56 Notebook,
58 Other,
60}
61
62impl ToolKind {
63 pub fn infer(name: &str) -> Self {
69 let lower = name.to_lowercase();
70 if lower.contains("notebook") {
72 Self::Notebook
73 } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74 Self::Shell
75 } else if lower.contains("read") || lower == "view" || lower == "cat" {
76 Self::FileRead
77 } else if lower.contains("write") {
78 Self::FileWrite
79 } else if lower.contains("edit") || lower.contains("patch") {
80 Self::FileEdit
81 } else if lower.contains("grep")
82 || lower.contains("glob")
83 || lower.contains("search")
84 || lower == "find"
85 {
86 Self::Search
87 } else if lower.contains("agent") {
88 Self::SubAgent
89 } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90 Self::Web
91 } else {
92 Self::Other
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100 SessionStarted {
101 command: String,
102 model: Option<String>,
103 cwd: Option<String>,
104 resumed: bool,
105 backfilled: bool,
106 },
107 UserMessage {
108 role: String,
109 content: String,
110 message_id: Option<String>,
111 },
112 AssistantMessage {
113 content: String,
114 message_id: Option<String>,
115 },
116 Reasoning {
117 content: String,
118 message_id: Option<String>,
119 },
120 ToolCall {
121 tool_name: String,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 tool_kind: Option<ToolKind>,
124 tool_id: Option<String>,
125 input: Option<Value>,
126 },
127 ToolResult {
128 tool_name: Option<String>,
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 tool_kind: Option<ToolKind>,
131 tool_id: Option<String>,
132 success: Option<bool>,
133 output: Option<String>,
134 error: Option<String>,
135 data: Option<Value>,
136 },
137 Permission {
138 tool_name: String,
139 description: String,
140 granted: bool,
141 },
142 ProviderStatus {
143 message: String,
144 data: Option<Value>,
145 },
146 Stderr {
147 message: String,
148 },
149 ParseWarning {
150 message: String,
151 raw: Option<String>,
152 },
153 SessionCleared {
154 old_session_id: Option<String>,
155 new_session_id: Option<String>,
156 },
157 SessionEnded {
158 success: bool,
159 error: Option<String>,
160 },
161 Heartbeat {
162 interval_secs: Option<u64>,
163 },
164 Usage {
165 input_tokens: u64,
166 output_tokens: u64,
167 #[serde(default, skip_serializing_if = "Option::is_none")]
168 cache_read_tokens: Option<u64>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 cache_creation_tokens: Option<u64>,
171 #[serde(default, skip_serializing_if = "Option::is_none")]
172 total_cost_usd: Option<f64>,
173 },
174 UserEvent {
175 level: String,
176 message: String,
177 #[serde(default, skip_serializing_if = "Option::is_none")]
178 data: Option<Value>,
179 },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184 pub seq: u64,
185 pub ts: String,
186 pub provider: String,
187 pub wrapper_session_id: String,
188 #[serde(default)]
189 pub provider_session_id: Option<String>,
190 pub source_kind: LogSourceKind,
191 pub completeness: LogCompleteness,
192 #[serde(flatten)]
193 pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198 pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203 pub wrapper_session_id: String,
204 pub provider: String,
205 #[serde(default)]
206 pub provider_session_id: Option<String>,
207 pub log_path: String,
208 pub completeness: LogCompleteness,
209 pub started_at: String,
210 #[serde(default)]
211 pub ended_at: Option<String>,
212 #[serde(default)]
213 pub workspace_path: Option<String>,
214 #[serde(default)]
215 pub command: Option<String>,
216 #[serde(default)]
217 pub source_paths: Vec<String>,
218 #[serde(default)]
219 pub backfilled: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226 pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231 pub session_id: String,
232 pub project: String,
233 pub log_path: String,
234 pub provider: String,
235 pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239 let path = base_dir.join("sessions_index.json");
240 if !path.exists() {
241 return Ok(GlobalSessionIndex::default());
242 }
243 let content = std::fs::read_to_string(&path)
244 .with_context(|| format!("Failed to read {}", path.display()))?;
245 Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249 let path = base_dir.join("sessions_index.json");
250 let content = serde_json::to_string_pretty(index)?;
251 crate::file_util::atomic_write_str(&path, &content)
252 .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256 let mut index = load_global_index(base_dir)?;
257 if let Some(existing) = index
258 .sessions
259 .iter_mut()
260 .find(|e| e.session_id == entry.session_id)
261 {
262 existing.log_path = entry.log_path;
263 existing.provider = entry.provider;
264 existing.started_at = entry.started_at;
265 existing.project = entry.project;
266 } else {
267 index.sessions.push(entry);
268 }
269 save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274 #[serde(default)]
275 pub version: u32,
276 #[serde(default)]
277 pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282 pub provider: String,
283 pub wrapper_session_id: String,
284 pub provider_session_id: Option<String>,
285 pub workspace_path: Option<String>,
286 pub command: String,
287 pub model: Option<String>,
288 pub resumed: bool,
289 pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294 pub root: Option<String>,
295 pub provider_session_id: Option<String>,
296 pub workspace_path: Option<String>,
297 pub started_at: DateTime<Utc>,
298 pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305 pub metadata: SessionLogMetadata,
306 pub completeness: LogCompleteness,
307 pub source_paths: Vec<String>,
308 pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313 async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315 async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316 self.poll(writer).await
317 }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321 fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324#[derive(Clone)]
325pub struct SessionLogWriter {
326 state: Arc<Mutex<WriterState>>,
327}
328
329struct WriterState {
330 metadata: SessionLogMetadata,
331 log_path: PathBuf,
332 index_path: PathBuf,
333 next_seq: u64,
334 completeness: LogCompleteness,
335 global_index_dir: Option<PathBuf>,
336}
337
338pub struct SessionLogCoordinator {
339 writer: SessionLogWriter,
340 stop_tx: Option<watch::Sender<bool>>,
341 task: Option<JoinHandle<Result<()>>>,
342}
343
344impl SessionLogWriter {
345 pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
351 let sessions_dir = logs_dir.join("sessions");
352 std::fs::create_dir_all(&sessions_dir).with_context(|| {
353 format!(
354 "Failed to create session log directory: {}",
355 sessions_dir.display()
356 )
357 })?;
358 let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
359 if let Some(parent) = log_path.parent() {
360 std::fs::create_dir_all(parent)
361 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
362 }
363 if !log_path.exists() {
364 File::create(&log_path)
365 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
366 }
367
368 let next_seq = next_sequence(&log_path)?;
369 let index_path = logs_dir.join("index.json");
370 let writer = Self {
371 state: Arc::new(Mutex::new(WriterState {
372 metadata: metadata.clone(),
373 log_path: log_path.clone(),
374 index_path,
375 next_seq,
376 completeness: LogCompleteness::Full,
377 global_index_dir: None,
378 })),
379 };
380
381 writer.upsert_index()?;
382 Ok(writer)
383 }
384
385 pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
388 let mut state = self
389 .state
390 .lock()
391 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
392 state.global_index_dir = Some(dir);
393 Ok(())
394 }
395
396 pub fn log_path(&self) -> Result<PathBuf> {
397 let state = self
398 .state
399 .lock()
400 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
401 Ok(state.log_path.clone())
402 }
403
404 pub fn get_provider_session_id(&self) -> Option<String> {
405 self.state.lock().ok()?.metadata.provider_session_id.clone()
406 }
407
408 pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
409 let mut state = self
410 .state
411 .lock()
412 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
413 state.metadata.provider_session_id = provider_session_id;
414 drop(state);
415 self.upsert_index()
416 }
417
418 pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
419 let mut state = self
420 .state
421 .lock()
422 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
423 if rank_completeness(completeness) < rank_completeness(state.completeness) {
424 state.completeness = completeness;
425 }
426 drop(state);
427 self.upsert_index()
428 }
429
430 pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
431 let path = path.into();
432 let state = self
433 .state
434 .lock()
435 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
436 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
437 let index_path = state.index_path.clone();
438 drop(state);
439
440 let mut index = load_index(&index_path)?;
441 if let Some(entry) = index
442 .sessions
443 .iter_mut()
444 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
445 && !entry.source_paths.contains(&path)
446 {
447 entry.source_paths.push(path);
448 save_index(&index_path, &index)?;
449 }
450 Ok(())
451 }
452
453 pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
454 let mut state = self
455 .state
456 .lock()
457 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
458 let event = AgentLogEvent {
459 seq: state.next_seq,
460 ts: Utc::now().to_rfc3339(),
461 provider: state.metadata.provider.clone(),
462 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
463 provider_session_id: state.metadata.provider_session_id.clone(),
464 source_kind,
465 completeness: state.completeness,
466 kind,
467 };
468 state.next_seq += 1;
469
470 let mut file = OpenOptions::new()
471 .append(true)
472 .open(&state.log_path)
473 .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
474 writeln!(file, "{}", serde_json::to_string(&event)?)
475 .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
476 Ok(())
477 }
478
479 pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
480 self.emit(
481 LogSourceKind::Wrapper,
482 LogEventKind::SessionEnded { success, error },
483 )?;
484 let state = self
485 .state
486 .lock()
487 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488 let index_path = state.index_path.clone();
489 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
490 drop(state);
491 let mut index = load_index(&index_path)?;
492 if let Some(entry) = index
493 .sessions
494 .iter_mut()
495 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
496 {
497 entry.ended_at = Some(Utc::now().to_rfc3339());
498 }
499 save_index(&index_path, &index)
500 }
501
502 fn upsert_index(&self) -> Result<()> {
503 let state = self
504 .state
505 .lock()
506 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
507 let mut index = load_index(&state.index_path)?;
508 let started_at;
509 let existing = index
510 .sessions
511 .iter_mut()
512 .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
513 if let Some(entry) = existing {
514 entry.provider_session_id = state.metadata.provider_session_id.clone();
515 entry.log_path = state.log_path.to_string_lossy().to_string();
516 entry.workspace_path = state.metadata.workspace_path.clone();
517 entry.command = Some(state.metadata.command.clone());
518 entry.completeness = state.completeness;
519 entry.backfilled = state.metadata.backfilled;
520 started_at = entry.started_at.clone();
521 } else {
522 started_at = Utc::now().to_rfc3339();
523 index.sessions.push(SessionLogIndexEntry {
524 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
525 provider: state.metadata.provider.clone(),
526 provider_session_id: state.metadata.provider_session_id.clone(),
527 log_path: state.log_path.to_string_lossy().to_string(),
528 completeness: state.completeness,
529 started_at: started_at.clone(),
530 ended_at: None,
531 workspace_path: state.metadata.workspace_path.clone(),
532 command: Some(state.metadata.command.clone()),
533 source_paths: Vec::new(),
534 backfilled: state.metadata.backfilled,
535 });
536 }
537 save_index(&state.index_path, &index)?;
538
539 if let Some(ref global_dir) = state.global_index_dir {
541 let project = state
543 .index_path
544 .parent()
545 .and_then(|logs| logs.parent())
546 .and_then(|proj| proj.file_name())
547 .map(|n| n.to_string_lossy().to_string())
548 .unwrap_or_default();
549 let _ = upsert_global_entry(
550 global_dir,
551 GlobalSessionEntry {
552 session_id: state.metadata.wrapper_session_id.clone(),
553 project,
554 log_path: state.log_path.to_string_lossy().to_string(),
555 provider: state.metadata.provider.clone(),
556 started_at,
557 },
558 );
559 }
560
561 Ok(())
562 }
563}
564
565impl SessionLogCoordinator {
566 pub fn start(
570 logs_dir: &Path,
571 metadata: SessionLogMetadata,
572 live_adapter: Option<Box<dyn LiveLogAdapter>>,
573 ) -> Result<Self> {
574 let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
575 writer.emit(
576 if metadata.backfilled {
577 LogSourceKind::Backfill
578 } else {
579 LogSourceKind::Wrapper
580 },
581 LogEventKind::SessionStarted {
582 command: metadata.command.clone(),
583 model: metadata.model.clone(),
584 cwd: metadata.workspace_path.clone(),
585 resumed: metadata.resumed,
586 backfilled: metadata.backfilled,
587 },
588 )?;
589
590 if let Some(adapter) = live_adapter {
591 let (stop_tx, stop_rx) = watch::channel(false);
592 let writer_clone = writer.clone();
593 let task =
594 tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
595 Ok(Self {
596 writer,
597 stop_tx: Some(stop_tx),
598 task: Some(task),
599 })
600 } else {
601 let (stop_tx, stop_rx) = watch::channel(false);
603 let writer_clone = writer.clone();
604 let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
605 Ok(Self {
606 writer,
607 stop_tx: Some(stop_tx),
608 task: Some(task),
609 })
610 }
611 }
612
613 pub fn writer(&self) -> &SessionLogWriter {
614 &self.writer
615 }
616
617 pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
618 if let Some(stop_tx) = self.stop_tx.take() {
619 let _ = stop_tx.send(true);
620 }
621 if let Some(task) = self.task.take() {
622 task.await??;
623 }
624 self.writer.finish(success, error)
625 }
626}
627
628pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
629 if let Some(prompt) = prompt
630 && !prompt.trim().is_empty()
631 {
632 writer.emit(
633 LogSourceKind::Wrapper,
634 LogEventKind::UserMessage {
635 role: "user".to_string(),
636 content: prompt.to_string(),
637 message_id: None,
638 },
639 )?;
640 }
641 Ok(())
642}
643
644pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
645 if !output.session_id.is_empty() && output.session_id != "unknown" {
646 writer.set_provider_session_id(Some(output.session_id.clone()))?;
647 }
648 for event in &output.events {
649 match event {
650 Event::AssistantMessage { content, .. } => {
651 for block in content {
652 match block {
653 ContentBlock::Text { text } => {
654 writer.emit(
655 LogSourceKind::Wrapper,
656 LogEventKind::AssistantMessage {
657 content: text.clone(),
658 message_id: None,
659 },
660 )?;
661 }
662 ContentBlock::ToolUse { id, name, input } => {
663 writer.emit(
664 LogSourceKind::Wrapper,
665 LogEventKind::ToolCall {
666 tool_kind: Some(ToolKind::infer(name)),
667 tool_name: name.clone(),
668 tool_id: Some(id.clone()),
669 input: Some(input.clone()),
670 },
671 )?;
672 }
673 }
674 }
675 }
676 Event::ToolExecution {
677 tool_name,
678 tool_id,
679 result,
680 ..
681 } => {
682 writer.emit(
683 LogSourceKind::Wrapper,
684 LogEventKind::ToolResult {
685 tool_kind: Some(ToolKind::infer(tool_name)),
686 tool_name: Some(tool_name.clone()),
687 tool_id: Some(tool_id.clone()),
688 success: Some(result.success),
689 output: result.output.clone(),
690 error: result.error.clone(),
691 data: result.data.clone(),
692 },
693 )?;
694 }
695 Event::PermissionRequest {
696 tool_name,
697 description,
698 granted,
699 } => {
700 writer.emit(
701 LogSourceKind::Wrapper,
702 LogEventKind::Permission {
703 tool_name: tool_name.clone(),
704 description: description.clone(),
705 granted: *granted,
706 },
707 )?;
708 }
709 Event::Error { message, details } => {
710 writer.emit(
711 LogSourceKind::Wrapper,
712 LogEventKind::ProviderStatus {
713 message: message.clone(),
714 data: details.clone(),
715 },
716 )?;
717 }
718 Event::Init {
719 model,
720 working_directory,
721 metadata,
722 ..
723 } => {
724 writer.emit(
725 LogSourceKind::Wrapper,
726 LogEventKind::ProviderStatus {
727 message: format!("Initialized {model}"),
728 data: Some(serde_json::json!({
729 "working_directory": working_directory,
730 "metadata": metadata,
731 })),
732 },
733 )?;
734 }
735 Event::UserMessage { content } => {
736 for block in content {
737 if let ContentBlock::Text { text } = block {
738 writer.emit(
739 LogSourceKind::Wrapper,
740 LogEventKind::UserMessage {
741 role: "user".to_string(),
742 content: text.clone(),
743 message_id: None,
744 },
745 )?;
746 }
747 }
748 }
749 Event::Result {
750 success,
751 message,
752 duration_ms,
753 num_turns,
754 } => {
755 writer.emit(
756 LogSourceKind::Wrapper,
757 LogEventKind::ProviderStatus {
758 message: message
759 .clone()
760 .unwrap_or_else(|| "Result emitted".to_string()),
761 data: Some(serde_json::json!({
762 "success": success,
763 "duration_ms": duration_ms,
764 "num_turns": num_turns,
765 })),
766 },
767 )?;
768 }
769 Event::TurnComplete {
770 stop_reason,
771 turn_index,
772 usage,
773 } => {
774 writer.emit(
775 LogSourceKind::Wrapper,
776 LogEventKind::ProviderStatus {
777 message: format!("Turn {turn_index} complete"),
778 data: Some(serde_json::json!({
779 "stop_reason": stop_reason,
780 "turn_index": turn_index,
781 "usage": usage,
782 })),
783 },
784 )?;
785 }
786 }
787 }
788
789 if let Some(ref usage) = output.usage {
791 writer.emit(
792 LogSourceKind::Wrapper,
793 LogEventKind::Usage {
794 input_tokens: usage.input_tokens,
795 output_tokens: usage.output_tokens,
796 cache_read_tokens: usage.cache_read_tokens,
797 cache_creation_tokens: usage.cache_creation_tokens,
798 total_cost_usd: output.total_cost_usd,
799 },
800 )?;
801 } else if let Some(cost) = output.total_cost_usd {
802 writer.emit(
804 LogSourceKind::Wrapper,
805 LogEventKind::Usage {
806 input_tokens: 0,
807 output_tokens: 0,
808 cache_read_tokens: None,
809 cache_creation_tokens: None,
810 total_cost_usd: Some(cost),
811 },
812 )?;
813 }
814
815 Ok(())
816}
817
818pub fn run_backfill(
822 logs_dir: &Path,
823 root: Option<&str>,
824 providers: &[&dyn HistoricalLogAdapter],
825) -> Result<usize> {
826 let state_path = logs_dir.join("backfill_state.json");
827 let mut state = load_backfill_state(&state_path)?;
828 let current_version = 1;
829 if state.version == current_version {
830 info!("Historical log import already completed for version {current_version}");
831 return Ok(0);
832 }
833
834 info!("Starting historical log import");
835 let mut imported = 0;
836 for provider in providers {
837 for session in provider.backfill(root)? {
838 let key = session_key(&session.metadata);
839 if state.imported_session_keys.contains(&key) {
840 info!(
841 "Skipping already imported historical session: {} {}",
842 session.metadata.provider,
843 session
844 .metadata
845 .provider_session_id
846 .as_deref()
847 .unwrap_or(&session.metadata.wrapper_session_id)
848 );
849 continue;
850 }
851
852 info!(
853 "Importing historical session: {} {}",
854 session.metadata.provider,
855 session
856 .metadata
857 .provider_session_id
858 .as_deref()
859 .unwrap_or(&session.metadata.wrapper_session_id)
860 );
861
862 let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
863 writer.set_completeness(session.completeness)?;
864 for source_path in session.source_paths {
865 info!(" source: {source_path}");
866 let _ = writer.add_source_path(source_path);
867 }
868 for (source_kind, event) in session.events {
869 writer.emit(source_kind, event)?;
870 }
871 writer.finish(true, None)?;
872 state.imported_session_keys.push(key);
873 imported += 1;
874 }
875 }
876
877 state.version = current_version;
878 save_backfill_state(&state_path, &state)?;
879 info!("Historical log import finished: {imported} session(s) imported");
880 Ok(imported)
881}
882
883const HEARTBEAT_INTERVAL_SECS: u64 = 10;
885
886async fn run_live_adapter(
887 mut adapter: Box<dyn LiveLogAdapter>,
888 writer: SessionLogWriter,
889 mut stop_rx: watch::Receiver<bool>,
890) -> Result<()> {
891 let mut last_heartbeat = tokio::time::Instant::now();
892 loop {
893 adapter.poll(&writer).await?;
894
895 if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
897 let _ = writer.emit(
898 LogSourceKind::Wrapper,
899 LogEventKind::Heartbeat {
900 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
901 },
902 );
903 last_heartbeat = tokio::time::Instant::now();
904 }
905
906 tokio::select! {
907 changed = stop_rx.changed() => {
908 if changed.is_ok() && *stop_rx.borrow() {
909 break;
910 }
911 }
912 _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
913 }
914 }
915 adapter.finalize(&writer).await
916}
917
918async fn run_heartbeat_loop(
920 writer: SessionLogWriter,
921 mut stop_rx: watch::Receiver<bool>,
922) -> Result<()> {
923 loop {
924 tokio::select! {
925 changed = stop_rx.changed() => {
926 if changed.is_ok() && *stop_rx.borrow() {
927 break;
928 }
929 }
930 _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
931 let _ = writer.emit(
932 LogSourceKind::Wrapper,
933 LogEventKind::Heartbeat {
934 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
935 },
936 );
937 }
938 }
939 }
940 Ok(())
941}
942
943fn next_sequence(path: &Path) -> Result<u64> {
944 if !path.exists() {
945 return Ok(1);
946 }
947 let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
948 let reader = BufReader::new(file);
949 let mut last_seq = 0;
950 for line in reader.lines() {
951 let line = line?;
952 if line.trim().is_empty() {
953 continue;
954 }
955 if let Ok(value) = serde_json::from_str::<Value>(&line)
956 && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
957 {
958 last_seq = seq;
959 }
960 }
961 Ok(last_seq + 1)
962}
963
964fn load_index(path: &Path) -> Result<SessionLogIndex> {
965 if !path.exists() {
966 return Ok(SessionLogIndex::default());
967 }
968 let content = std::fs::read_to_string(path)
969 .with_context(|| format!("Failed to read {}", path.display()))?;
970 Ok(serde_json::from_str(&content).unwrap_or_default())
971}
972
973fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
974 let content = serde_json::to_string_pretty(index)?;
975 crate::file_util::atomic_write_str(path, &content)
976 .with_context(|| format!("Failed to write {}", path.display()))
977}
978
979fn load_backfill_state(path: &Path) -> Result<BackfillState> {
980 if !path.exists() {
981 return Ok(BackfillState::default());
982 }
983 let content = std::fs::read_to_string(path)
984 .with_context(|| format!("Failed to read {}", path.display()))?;
985 Ok(serde_json::from_str(&content).unwrap_or_default())
986}
987
988fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
989 let content = serde_json::to_string_pretty(state)?;
990 crate::file_util::atomic_write_str(path, &content)
991 .with_context(|| format!("Failed to write {}", path.display()))
992}
993
994fn rank_completeness(completeness: LogCompleteness) -> u8 {
995 match completeness {
996 LogCompleteness::Full => 3,
997 LogCompleteness::Partial => 2,
998 LogCompleteness::MetadataOnly => 1,
999 }
1000}
1001
1002fn session_key(metadata: &SessionLogMetadata) -> String {
1003 format!(
1004 "{}:{}",
1005 metadata.provider,
1006 metadata
1007 .provider_session_id
1008 .as_deref()
1009 .unwrap_or(&metadata.wrapper_session_id)
1010 )
1011}
1012
1013#[cfg(test)]
1014#[path = "session_log_tests.rs"]
1015mod tests;