1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18 Full,
19 Partial,
20 MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26 Wrapper,
27 ProviderFile,
28 ProviderLog,
29 Stdout,
30 Stderr,
31 Backfill,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42 Shell,
44 FileRead,
46 FileWrite,
48 FileEdit,
50 Search,
52 SubAgent,
54 Web,
56 Notebook,
58 Other,
60}
61
62impl ToolKind {
63 pub fn infer(name: &str) -> Self {
69 let lower = name.to_lowercase();
70 if lower.contains("notebook") {
72 Self::Notebook
73 } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74 Self::Shell
75 } else if lower.contains("read") || lower == "view" || lower == "cat" {
76 Self::FileRead
77 } else if lower.contains("write") {
78 Self::FileWrite
79 } else if lower.contains("edit") || lower.contains("patch") {
80 Self::FileEdit
81 } else if lower.contains("grep")
82 || lower.contains("glob")
83 || lower.contains("search")
84 || lower == "find"
85 {
86 Self::Search
87 } else if lower.contains("agent") {
88 Self::SubAgent
89 } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90 Self::Web
91 } else {
92 Self::Other
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100 SessionStarted {
101 command: String,
102 model: Option<String>,
103 cwd: Option<String>,
104 resumed: bool,
105 backfilled: bool,
106 },
107 UserMessage {
108 role: String,
109 content: String,
110 message_id: Option<String>,
111 },
112 AssistantMessage {
113 content: String,
114 message_id: Option<String>,
115 },
116 Reasoning {
117 content: String,
118 message_id: Option<String>,
119 },
120 ToolCall {
121 tool_name: String,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 tool_kind: Option<ToolKind>,
124 tool_id: Option<String>,
125 input: Option<Value>,
126 },
127 ToolResult {
128 tool_name: Option<String>,
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 tool_kind: Option<ToolKind>,
131 tool_id: Option<String>,
132 success: Option<bool>,
133 output: Option<String>,
134 error: Option<String>,
135 data: Option<Value>,
136 },
137 Permission {
138 tool_name: String,
139 description: String,
140 granted: bool,
141 },
142 ProviderStatus {
143 message: String,
144 data: Option<Value>,
145 },
146 Stderr {
147 message: String,
148 },
149 ParseWarning {
150 message: String,
151 raw: Option<String>,
152 },
153 SessionCleared {
154 old_session_id: Option<String>,
155 new_session_id: Option<String>,
156 },
157 SessionEnded {
158 success: bool,
159 error: Option<String>,
160 },
161 Heartbeat {
162 interval_secs: Option<u64>,
163 },
164 Usage {
165 input_tokens: u64,
166 output_tokens: u64,
167 #[serde(default, skip_serializing_if = "Option::is_none")]
168 cache_read_tokens: Option<u64>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 cache_creation_tokens: Option<u64>,
171 #[serde(default, skip_serializing_if = "Option::is_none")]
172 total_cost_usd: Option<f64>,
173 },
174 UserEvent {
175 level: String,
176 message: String,
177 #[serde(default, skip_serializing_if = "Option::is_none")]
178 data: Option<Value>,
179 },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184 pub seq: u64,
185 pub ts: String,
186 pub provider: String,
187 pub wrapper_session_id: String,
188 #[serde(default)]
189 pub provider_session_id: Option<String>,
190 pub source_kind: LogSourceKind,
191 pub completeness: LogCompleteness,
192 #[serde(flatten)]
193 pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198 pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203 pub wrapper_session_id: String,
204 pub provider: String,
205 #[serde(default)]
206 pub provider_session_id: Option<String>,
207 pub log_path: String,
208 pub completeness: LogCompleteness,
209 pub started_at: String,
210 #[serde(default)]
211 pub ended_at: Option<String>,
212 #[serde(default)]
213 pub workspace_path: Option<String>,
214 #[serde(default)]
215 pub command: Option<String>,
216 #[serde(default)]
217 pub source_paths: Vec<String>,
218 #[serde(default)]
219 pub backfilled: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226 pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231 pub session_id: String,
232 pub project: String,
233 pub log_path: String,
234 pub provider: String,
235 pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239 let path = base_dir.join("sessions_index.json");
240 if !path.exists() {
241 return Ok(GlobalSessionIndex::default());
242 }
243 let content = std::fs::read_to_string(&path)
244 .with_context(|| format!("Failed to read {}", path.display()))?;
245 Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249 let path = base_dir.join("sessions_index.json");
250 let content = serde_json::to_string_pretty(index)?;
251 crate::file_util::atomic_write_str(&path, &content)
252 .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256 let mut index = load_global_index(base_dir)?;
257 if let Some(existing) = index
258 .sessions
259 .iter_mut()
260 .find(|e| e.session_id == entry.session_id)
261 {
262 existing.log_path = entry.log_path;
263 existing.provider = entry.provider;
264 existing.started_at = entry.started_at;
265 existing.project = entry.project;
266 } else {
267 index.sessions.push(entry);
268 }
269 save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274 #[serde(default)]
275 pub version: u32,
276 #[serde(default)]
277 pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282 pub provider: String,
283 pub wrapper_session_id: String,
284 pub provider_session_id: Option<String>,
285 pub workspace_path: Option<String>,
286 pub command: String,
287 pub model: Option<String>,
288 pub resumed: bool,
289 pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294 pub root: Option<String>,
295 pub provider_session_id: Option<String>,
296 pub workspace_path: Option<String>,
297 pub started_at: DateTime<Utc>,
298 pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305 pub metadata: SessionLogMetadata,
306 pub completeness: LogCompleteness,
307 pub source_paths: Vec<String>,
308 pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313 async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315 async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316 self.poll(writer).await
317 }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321 fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324#[derive(Clone)]
325pub struct SessionLogWriter {
326 state: Arc<Mutex<WriterState>>,
327}
328
329struct WriterState {
330 metadata: SessionLogMetadata,
331 log_path: PathBuf,
332 index_path: PathBuf,
333 next_seq: u64,
334 completeness: LogCompleteness,
335 global_index_dir: Option<PathBuf>,
336}
337
338pub struct SessionLogCoordinator {
339 writer: SessionLogWriter,
340 stop_tx: Option<watch::Sender<bool>>,
341 task: Option<JoinHandle<Result<()>>>,
342}
343
344impl SessionLogWriter {
345 pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
351 let sessions_dir = logs_dir.join("sessions");
352 std::fs::create_dir_all(&sessions_dir).with_context(|| {
353 format!(
354 "Failed to create session log directory: {}",
355 sessions_dir.display()
356 )
357 })?;
358 let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
359 if let Some(parent) = log_path.parent() {
360 std::fs::create_dir_all(parent)
361 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
362 }
363 if !log_path.exists() {
364 File::create(&log_path)
365 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
366 }
367
368 let next_seq = next_sequence(&log_path)?;
369 let index_path = logs_dir.join("index.json");
370 let writer = Self {
371 state: Arc::new(Mutex::new(WriterState {
372 metadata: metadata.clone(),
373 log_path: log_path.clone(),
374 index_path,
375 next_seq,
376 completeness: LogCompleteness::Full,
377 global_index_dir: None,
378 })),
379 };
380
381 writer.upsert_index()?;
382 Ok(writer)
383 }
384
385 pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
388 let mut state = self
389 .state
390 .lock()
391 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
392 state.global_index_dir = Some(dir);
393 Ok(())
394 }
395
396 pub fn log_path(&self) -> Result<PathBuf> {
397 let state = self
398 .state
399 .lock()
400 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
401 Ok(state.log_path.clone())
402 }
403
404 pub fn get_provider_session_id(&self) -> Option<String> {
405 self.state.lock().ok()?.metadata.provider_session_id.clone()
406 }
407
408 pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
409 let mut state = self
410 .state
411 .lock()
412 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
413 state.metadata.provider_session_id = provider_session_id;
414 drop(state);
415 self.upsert_index()
416 }
417
418 pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
419 let mut state = self
420 .state
421 .lock()
422 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
423 if rank_completeness(completeness) < rank_completeness(state.completeness) {
424 state.completeness = completeness;
425 }
426 drop(state);
427 self.upsert_index()
428 }
429
430 pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
431 let path = path.into();
432 let state = self
433 .state
434 .lock()
435 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
436 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
437 let index_path = state.index_path.clone();
438 drop(state);
439
440 let mut index = load_index(&index_path)?;
441 if let Some(entry) = index
442 .sessions
443 .iter_mut()
444 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
445 && !entry.source_paths.contains(&path)
446 {
447 entry.source_paths.push(path);
448 save_index(&index_path, &index)?;
449 }
450 Ok(())
451 }
452
453 pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
454 let mut state = self
455 .state
456 .lock()
457 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
458 let event = AgentLogEvent {
459 seq: state.next_seq,
460 ts: Utc::now().to_rfc3339(),
461 provider: state.metadata.provider.clone(),
462 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
463 provider_session_id: state.metadata.provider_session_id.clone(),
464 source_kind,
465 completeness: state.completeness,
466 kind,
467 };
468 state.next_seq += 1;
469
470 let mut file = OpenOptions::new()
471 .append(true)
472 .open(&state.log_path)
473 .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
474 writeln!(file, "{}", serde_json::to_string(&event)?)
475 .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
476 Ok(())
477 }
478
479 pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
480 self.emit(
481 LogSourceKind::Wrapper,
482 LogEventKind::SessionEnded { success, error },
483 )?;
484 let state = self
485 .state
486 .lock()
487 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488 let index_path = state.index_path.clone();
489 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
490 drop(state);
491 let mut index = load_index(&index_path)?;
492 if let Some(entry) = index
493 .sessions
494 .iter_mut()
495 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
496 {
497 entry.ended_at = Some(Utc::now().to_rfc3339());
498 }
499 save_index(&index_path, &index)
500 }
501
502 fn upsert_index(&self) -> Result<()> {
503 let state = self
504 .state
505 .lock()
506 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
507 let mut index = load_index(&state.index_path)?;
508 let started_at;
509 let existing = index
510 .sessions
511 .iter_mut()
512 .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
513 if let Some(entry) = existing {
514 entry.provider_session_id = state.metadata.provider_session_id.clone();
515 entry.log_path = state.log_path.to_string_lossy().to_string();
516 entry.workspace_path = state.metadata.workspace_path.clone();
517 entry.command = Some(state.metadata.command.clone());
518 entry.completeness = state.completeness;
519 entry.backfilled = state.metadata.backfilled;
520 started_at = entry.started_at.clone();
521 } else {
522 started_at = Utc::now().to_rfc3339();
523 index.sessions.push(SessionLogIndexEntry {
524 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
525 provider: state.metadata.provider.clone(),
526 provider_session_id: state.metadata.provider_session_id.clone(),
527 log_path: state.log_path.to_string_lossy().to_string(),
528 completeness: state.completeness,
529 started_at: started_at.clone(),
530 ended_at: None,
531 workspace_path: state.metadata.workspace_path.clone(),
532 command: Some(state.metadata.command.clone()),
533 source_paths: Vec::new(),
534 backfilled: state.metadata.backfilled,
535 });
536 }
537 save_index(&state.index_path, &index)?;
538
539 if let Some(ref global_dir) = state.global_index_dir {
541 let project = state
543 .index_path
544 .parent()
545 .and_then(|logs| logs.parent())
546 .and_then(|proj| proj.file_name())
547 .map(|n| n.to_string_lossy().to_string())
548 .unwrap_or_default();
549 let _ = upsert_global_entry(
550 global_dir,
551 GlobalSessionEntry {
552 session_id: state.metadata.wrapper_session_id.clone(),
553 project,
554 log_path: state.log_path.to_string_lossy().to_string(),
555 provider: state.metadata.provider.clone(),
556 started_at,
557 },
558 );
559 }
560
561 Ok(())
562 }
563}
564
565impl SessionLogCoordinator {
566 pub fn start(
570 logs_dir: &Path,
571 metadata: SessionLogMetadata,
572 live_adapter: Option<Box<dyn LiveLogAdapter>>,
573 ) -> Result<Self> {
574 let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
575 writer.emit(
576 if metadata.backfilled {
577 LogSourceKind::Backfill
578 } else {
579 LogSourceKind::Wrapper
580 },
581 LogEventKind::SessionStarted {
582 command: metadata.command.clone(),
583 model: metadata.model.clone(),
584 cwd: metadata.workspace_path.clone(),
585 resumed: metadata.resumed,
586 backfilled: metadata.backfilled,
587 },
588 )?;
589
590 if let Some(adapter) = live_adapter {
591 let (stop_tx, stop_rx) = watch::channel(false);
592 let writer_clone = writer.clone();
593 let task =
594 tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
595 Ok(Self {
596 writer,
597 stop_tx: Some(stop_tx),
598 task: Some(task),
599 })
600 } else {
601 let (stop_tx, stop_rx) = watch::channel(false);
603 let writer_clone = writer.clone();
604 let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
605 Ok(Self {
606 writer,
607 stop_tx: Some(stop_tx),
608 task: Some(task),
609 })
610 }
611 }
612
613 pub fn writer(&self) -> &SessionLogWriter {
614 &self.writer
615 }
616
617 pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
618 if let Some(stop_tx) = self.stop_tx.take() {
619 let _ = stop_tx.send(true);
620 }
621 if let Some(task) = self.task.take() {
622 task.await??;
623 }
624 self.writer.finish(success, error)
625 }
626}
627
628pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
629 if let Some(prompt) = prompt
630 && !prompt.trim().is_empty()
631 {
632 writer.emit(
633 LogSourceKind::Wrapper,
634 LogEventKind::UserMessage {
635 role: "user".to_string(),
636 content: prompt.to_string(),
637 message_id: None,
638 },
639 )?;
640 }
641 Ok(())
642}
643
644pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
645 if !output.session_id.is_empty() && output.session_id != "unknown" {
646 writer.set_provider_session_id(Some(output.session_id.clone()))?;
647 }
648 for event in &output.events {
649 match event {
650 Event::AssistantMessage { content, .. } => {
651 for block in content {
652 match block {
653 ContentBlock::Text { text } => {
654 writer.emit(
655 LogSourceKind::Wrapper,
656 LogEventKind::AssistantMessage {
657 content: text.clone(),
658 message_id: None,
659 },
660 )?;
661 }
662 ContentBlock::ToolUse { id, name, input } => {
663 writer.emit(
664 LogSourceKind::Wrapper,
665 LogEventKind::ToolCall {
666 tool_kind: Some(ToolKind::infer(name)),
667 tool_name: name.clone(),
668 tool_id: Some(id.clone()),
669 input: Some(input.clone()),
670 },
671 )?;
672 }
673 }
674 }
675 }
676 Event::ToolExecution {
677 tool_name,
678 tool_id,
679 result,
680 ..
681 } => {
682 writer.emit(
683 LogSourceKind::Wrapper,
684 LogEventKind::ToolResult {
685 tool_kind: Some(ToolKind::infer(tool_name)),
686 tool_name: Some(tool_name.clone()),
687 tool_id: Some(tool_id.clone()),
688 success: Some(result.success),
689 output: result.output.clone(),
690 error: result.error.clone(),
691 data: result.data.clone(),
692 },
693 )?;
694 }
695 Event::PermissionRequest {
696 tool_name,
697 description,
698 granted,
699 } => {
700 writer.emit(
701 LogSourceKind::Wrapper,
702 LogEventKind::Permission {
703 tool_name: tool_name.clone(),
704 description: description.clone(),
705 granted: *granted,
706 },
707 )?;
708 }
709 Event::Error { message, details } => {
710 writer.emit(
711 LogSourceKind::Wrapper,
712 LogEventKind::ProviderStatus {
713 message: message.clone(),
714 data: details.clone(),
715 },
716 )?;
717 }
718 Event::Init {
719 model,
720 working_directory,
721 metadata,
722 ..
723 } => {
724 writer.emit(
725 LogSourceKind::Wrapper,
726 LogEventKind::ProviderStatus {
727 message: format!("Initialized {}", model),
728 data: Some(serde_json::json!({
729 "working_directory": working_directory,
730 "metadata": metadata,
731 })),
732 },
733 )?;
734 }
735 Event::UserMessage { content } => {
736 for block in content {
737 if let ContentBlock::Text { text } = block {
738 writer.emit(
739 LogSourceKind::Wrapper,
740 LogEventKind::UserMessage {
741 role: "user".to_string(),
742 content: text.clone(),
743 message_id: None,
744 },
745 )?;
746 }
747 }
748 }
749 Event::Result {
750 success,
751 message,
752 duration_ms,
753 num_turns,
754 } => {
755 writer.emit(
756 LogSourceKind::Wrapper,
757 LogEventKind::ProviderStatus {
758 message: message
759 .clone()
760 .unwrap_or_else(|| "Result emitted".to_string()),
761 data: Some(serde_json::json!({
762 "success": success,
763 "duration_ms": duration_ms,
764 "num_turns": num_turns,
765 })),
766 },
767 )?;
768 }
769 }
770 }
771
772 if let Some(ref usage) = output.usage {
774 writer.emit(
775 LogSourceKind::Wrapper,
776 LogEventKind::Usage {
777 input_tokens: usage.input_tokens,
778 output_tokens: usage.output_tokens,
779 cache_read_tokens: usage.cache_read_tokens,
780 cache_creation_tokens: usage.cache_creation_tokens,
781 total_cost_usd: output.total_cost_usd,
782 },
783 )?;
784 } else if let Some(cost) = output.total_cost_usd {
785 writer.emit(
787 LogSourceKind::Wrapper,
788 LogEventKind::Usage {
789 input_tokens: 0,
790 output_tokens: 0,
791 cache_read_tokens: None,
792 cache_creation_tokens: None,
793 total_cost_usd: Some(cost),
794 },
795 )?;
796 }
797
798 Ok(())
799}
800
801pub fn run_backfill(
805 logs_dir: &Path,
806 root: Option<&str>,
807 providers: &[&dyn HistoricalLogAdapter],
808) -> Result<usize> {
809 let state_path = logs_dir.join("backfill_state.json");
810 let mut state = load_backfill_state(&state_path)?;
811 let current_version = 1;
812 if state.version == current_version {
813 info!(
814 "Historical log import already completed for version {}",
815 current_version
816 );
817 return Ok(0);
818 }
819
820 info!("Starting historical log import");
821 let mut imported = 0;
822 for provider in providers {
823 for session in provider.backfill(root)? {
824 let key = session_key(&session.metadata);
825 if state.imported_session_keys.contains(&key) {
826 info!(
827 "Skipping already imported historical session: {} {}",
828 session.metadata.provider,
829 session
830 .metadata
831 .provider_session_id
832 .as_deref()
833 .unwrap_or(&session.metadata.wrapper_session_id)
834 );
835 continue;
836 }
837
838 info!(
839 "Importing historical session: {} {}",
840 session.metadata.provider,
841 session
842 .metadata
843 .provider_session_id
844 .as_deref()
845 .unwrap_or(&session.metadata.wrapper_session_id)
846 );
847
848 let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
849 writer.set_completeness(session.completeness)?;
850 for source_path in session.source_paths {
851 info!(" source: {}", source_path);
852 let _ = writer.add_source_path(source_path);
853 }
854 for (source_kind, event) in session.events {
855 writer.emit(source_kind, event)?;
856 }
857 writer.finish(true, None)?;
858 state.imported_session_keys.push(key);
859 imported += 1;
860 }
861 }
862
863 state.version = current_version;
864 save_backfill_state(&state_path, &state)?;
865 info!(
866 "Historical log import finished: {} session(s) imported",
867 imported
868 );
869 Ok(imported)
870}
871
872const HEARTBEAT_INTERVAL_SECS: u64 = 10;
874
875async fn run_live_adapter(
876 mut adapter: Box<dyn LiveLogAdapter>,
877 writer: SessionLogWriter,
878 mut stop_rx: watch::Receiver<bool>,
879) -> Result<()> {
880 let mut last_heartbeat = tokio::time::Instant::now();
881 loop {
882 adapter.poll(&writer).await?;
883
884 if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
886 let _ = writer.emit(
887 LogSourceKind::Wrapper,
888 LogEventKind::Heartbeat {
889 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
890 },
891 );
892 last_heartbeat = tokio::time::Instant::now();
893 }
894
895 tokio::select! {
896 changed = stop_rx.changed() => {
897 if changed.is_ok() && *stop_rx.borrow() {
898 break;
899 }
900 }
901 _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
902 }
903 }
904 adapter.finalize(&writer).await
905}
906
907async fn run_heartbeat_loop(
909 writer: SessionLogWriter,
910 mut stop_rx: watch::Receiver<bool>,
911) -> Result<()> {
912 loop {
913 tokio::select! {
914 changed = stop_rx.changed() => {
915 if changed.is_ok() && *stop_rx.borrow() {
916 break;
917 }
918 }
919 _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
920 let _ = writer.emit(
921 LogSourceKind::Wrapper,
922 LogEventKind::Heartbeat {
923 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
924 },
925 );
926 }
927 }
928 }
929 Ok(())
930}
931
932fn next_sequence(path: &Path) -> Result<u64> {
933 if !path.exists() {
934 return Ok(1);
935 }
936 let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
937 let reader = BufReader::new(file);
938 let mut last_seq = 0;
939 for line in reader.lines() {
940 let line = line?;
941 if line.trim().is_empty() {
942 continue;
943 }
944 if let Ok(value) = serde_json::from_str::<Value>(&line)
945 && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
946 {
947 last_seq = seq;
948 }
949 }
950 Ok(last_seq + 1)
951}
952
953fn load_index(path: &Path) -> Result<SessionLogIndex> {
954 if !path.exists() {
955 return Ok(SessionLogIndex::default());
956 }
957 let content = std::fs::read_to_string(path)
958 .with_context(|| format!("Failed to read {}", path.display()))?;
959 Ok(serde_json::from_str(&content).unwrap_or_default())
960}
961
962fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
963 let content = serde_json::to_string_pretty(index)?;
964 crate::file_util::atomic_write_str(path, &content)
965 .with_context(|| format!("Failed to write {}", path.display()))
966}
967
968fn load_backfill_state(path: &Path) -> Result<BackfillState> {
969 if !path.exists() {
970 return Ok(BackfillState::default());
971 }
972 let content = std::fs::read_to_string(path)
973 .with_context(|| format!("Failed to read {}", path.display()))?;
974 Ok(serde_json::from_str(&content).unwrap_or_default())
975}
976
977fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
978 let content = serde_json::to_string_pretty(state)?;
979 crate::file_util::atomic_write_str(path, &content)
980 .with_context(|| format!("Failed to write {}", path.display()))
981}
982
983fn rank_completeness(completeness: LogCompleteness) -> u8 {
984 match completeness {
985 LogCompleteness::Full => 3,
986 LogCompleteness::Partial => 2,
987 LogCompleteness::MetadataOnly => 1,
988 }
989}
990
991fn session_key(metadata: &SessionLogMetadata) -> String {
992 format!(
993 "{}:{}",
994 metadata.provider,
995 metadata
996 .provider_session_id
997 .as_deref()
998 .unwrap_or(&metadata.wrapper_session_id)
999 )
1000}
1001
1002#[cfg(test)]
1003#[path = "session_log_tests.rs"]
1004mod tests;