1use crate::output::{AgentOutput, ContentBlock, Event};
2use anyhow::{Context, Result};
3use async_trait::async_trait;
4use chrono::{DateTime, Utc};
5use log::info;
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::fs::{File, OpenOptions};
9use std::io::{BufRead, BufReader, Write};
10use std::path::{Path, PathBuf};
11use std::sync::{Arc, Mutex};
12use tokio::sync::watch;
13use tokio::task::JoinHandle;
14
15#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(rename_all = "snake_case")]
17pub enum LogCompleteness {
18 Full,
19 Partial,
20 MetadataOnly,
21}
22
23#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
24#[serde(rename_all = "snake_case")]
25pub enum LogSourceKind {
26 Wrapper,
27 ProviderFile,
28 ProviderLog,
29 Stdout,
30 Stderr,
31 Backfill,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
40#[serde(rename_all = "snake_case")]
41pub enum ToolKind {
42 Shell,
44 FileRead,
46 FileWrite,
48 FileEdit,
50 Search,
52 SubAgent,
54 Web,
56 Notebook,
58 Other,
60}
61
62impl ToolKind {
63 pub fn infer(name: &str) -> Self {
69 let lower = name.to_lowercase();
70 if lower.contains("notebook") {
72 Self::Notebook
73 } else if lower.contains("bash") || lower.contains("shell") || lower == "exec" {
74 Self::Shell
75 } else if lower.contains("read") || lower == "view" || lower == "cat" {
76 Self::FileRead
77 } else if lower.contains("write") {
78 Self::FileWrite
79 } else if lower.contains("edit") || lower.contains("patch") {
80 Self::FileEdit
81 } else if lower.contains("grep")
82 || lower.contains("glob")
83 || lower.contains("search")
84 || lower == "find"
85 {
86 Self::Search
87 } else if lower.contains("agent") {
88 Self::SubAgent
89 } else if lower.contains("web") || lower.contains("fetch") || lower.contains("http") {
90 Self::Web
91 } else {
92 Self::Other
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(tag = "type", rename_all = "snake_case")]
99pub enum LogEventKind {
100 SessionStarted {
101 command: String,
102 model: Option<String>,
103 cwd: Option<String>,
104 resumed: bool,
105 backfilled: bool,
106 },
107 UserMessage {
108 role: String,
109 content: String,
110 message_id: Option<String>,
111 },
112 AssistantMessage {
113 content: String,
114 message_id: Option<String>,
115 },
116 Reasoning {
117 content: String,
118 message_id: Option<String>,
119 },
120 ToolCall {
121 tool_name: String,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 tool_kind: Option<ToolKind>,
124 tool_id: Option<String>,
125 input: Option<Value>,
126 },
127 ToolResult {
128 tool_name: Option<String>,
129 #[serde(default, skip_serializing_if = "Option::is_none")]
130 tool_kind: Option<ToolKind>,
131 tool_id: Option<String>,
132 success: Option<bool>,
133 output: Option<String>,
134 error: Option<String>,
135 data: Option<Value>,
136 },
137 Permission {
138 tool_name: String,
139 description: String,
140 granted: bool,
141 },
142 ProviderStatus {
143 message: String,
144 data: Option<Value>,
145 },
146 Stderr {
147 message: String,
148 },
149 ParseWarning {
150 message: String,
151 raw: Option<String>,
152 },
153 SessionCleared {
154 old_session_id: Option<String>,
155 new_session_id: Option<String>,
156 },
157 SessionEnded {
158 success: bool,
159 error: Option<String>,
160 },
161 Heartbeat {
162 interval_secs: Option<u64>,
163 },
164 Usage {
165 input_tokens: u64,
166 output_tokens: u64,
167 #[serde(default, skip_serializing_if = "Option::is_none")]
168 cache_read_tokens: Option<u64>,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 cache_creation_tokens: Option<u64>,
171 #[serde(default, skip_serializing_if = "Option::is_none")]
172 total_cost_usd: Option<f64>,
173 },
174 UserEvent {
175 level: String,
176 message: String,
177 #[serde(default, skip_serializing_if = "Option::is_none")]
178 data: Option<Value>,
179 },
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct AgentLogEvent {
184 pub seq: u64,
185 pub ts: String,
186 pub provider: String,
187 pub wrapper_session_id: String,
188 #[serde(default)]
189 pub provider_session_id: Option<String>,
190 pub source_kind: LogSourceKind,
191 pub completeness: LogCompleteness,
192 #[serde(flatten)]
193 pub kind: LogEventKind,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize, Default)]
197pub struct SessionLogIndex {
198 pub sessions: Vec<SessionLogIndexEntry>,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct SessionLogIndexEntry {
203 pub wrapper_session_id: String,
204 pub provider: String,
205 #[serde(default)]
206 pub provider_session_id: Option<String>,
207 pub log_path: String,
208 pub completeness: LogCompleteness,
209 pub started_at: String,
210 #[serde(default)]
211 pub ended_at: Option<String>,
212 #[serde(default)]
213 pub workspace_path: Option<String>,
214 #[serde(default)]
215 pub command: Option<String>,
216 #[serde(default)]
217 pub source_paths: Vec<String>,
218 #[serde(default)]
219 pub backfilled: bool,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, Default)]
225pub struct GlobalSessionIndex {
226 pub sessions: Vec<GlobalSessionEntry>,
227}
228
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct GlobalSessionEntry {
231 pub session_id: String,
232 pub project: String,
233 pub log_path: String,
234 pub provider: String,
235 pub started_at: String,
236}
237
238pub fn load_global_index(base_dir: &Path) -> Result<GlobalSessionIndex> {
239 let path = base_dir.join("sessions_index.json");
240 if !path.exists() {
241 return Ok(GlobalSessionIndex::default());
242 }
243 let content = std::fs::read_to_string(&path)
244 .with_context(|| format!("Failed to read {}", path.display()))?;
245 Ok(serde_json::from_str(&content).unwrap_or_default())
246}
247
248pub fn save_global_index(base_dir: &Path, index: &GlobalSessionIndex) -> Result<()> {
249 let path = base_dir.join("sessions_index.json");
250 let content = serde_json::to_string_pretty(index)?;
251 crate::file_util::atomic_write_str(&path, &content)
252 .with_context(|| format!("Failed to write {}", path.display()))
253}
254
255pub fn upsert_global_entry(base_dir: &Path, entry: GlobalSessionEntry) -> Result<()> {
256 let mut index = load_global_index(base_dir)?;
257 if let Some(existing) = index
258 .sessions
259 .iter_mut()
260 .find(|e| e.session_id == entry.session_id)
261 {
262 existing.log_path = entry.log_path;
263 existing.provider = entry.provider;
264 existing.started_at = entry.started_at;
265 existing.project = entry.project;
266 } else {
267 index.sessions.push(entry);
268 }
269 save_global_index(base_dir, &index)
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize, Default)]
273pub struct BackfillState {
274 #[serde(default)]
275 pub version: u32,
276 #[serde(default)]
277 pub imported_session_keys: Vec<String>,
278}
279
280#[derive(Debug, Clone)]
281pub struct SessionLogMetadata {
282 pub provider: String,
283 pub wrapper_session_id: String,
284 pub provider_session_id: Option<String>,
285 pub workspace_path: Option<String>,
286 pub command: String,
287 pub model: Option<String>,
288 pub resumed: bool,
289 pub backfilled: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct LiveLogContext {
294 pub root: Option<String>,
295 pub provider_session_id: Option<String>,
296 pub workspace_path: Option<String>,
297 pub started_at: DateTime<Utc>,
298 pub is_worktree: bool,
301}
302
303#[derive(Debug, Clone)]
304pub struct BackfilledSession {
305 pub metadata: SessionLogMetadata,
306 pub completeness: LogCompleteness,
307 pub source_paths: Vec<String>,
308 pub events: Vec<(LogSourceKind, LogEventKind)>,
309}
310
311#[async_trait]
312pub trait LiveLogAdapter: Send {
313 async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()>;
314
315 async fn finalize(&mut self, writer: &SessionLogWriter) -> Result<()> {
316 self.poll(writer).await
317 }
318}
319
320pub trait HistoricalLogAdapter: Send + Sync {
321 fn backfill(&self, root: Option<&str>) -> Result<Vec<BackfilledSession>>;
322}
323
324#[derive(Clone)]
325pub struct SessionLogWriter {
326 state: Arc<Mutex<WriterState>>,
327}
328
329struct WriterState {
330 metadata: SessionLogMetadata,
331 log_path: PathBuf,
332 index_path: PathBuf,
333 next_seq: u64,
334 completeness: LogCompleteness,
335 global_index_dir: Option<PathBuf>,
336}
337
338pub struct SessionLogCoordinator {
339 writer: SessionLogWriter,
340 stop_tx: Option<watch::Sender<bool>>,
341 task: Option<JoinHandle<Result<()>>>,
342}
343
344impl SessionLogWriter {
345 pub fn create(logs_dir: &Path, metadata: SessionLogMetadata) -> Result<Self> {
351 let sessions_dir = logs_dir.join("sessions");
352 std::fs::create_dir_all(&sessions_dir).with_context(|| {
353 format!(
354 "Failed to create session log directory: {}",
355 sessions_dir.display()
356 )
357 })?;
358 let log_path = sessions_dir.join(format!("{}.jsonl", metadata.wrapper_session_id));
359 if let Some(parent) = log_path.parent() {
360 std::fs::create_dir_all(parent)
361 .with_context(|| format!("Failed to create directory: {}", parent.display()))?;
362 }
363 if !log_path.exists() {
364 File::create(&log_path)
365 .with_context(|| format!("Failed to create log file: {}", log_path.display()))?;
366 }
367
368 let next_seq = next_sequence(&log_path)?;
369 let index_path = logs_dir.join("index.json");
370 let writer = Self {
371 state: Arc::new(Mutex::new(WriterState {
372 metadata: metadata.clone(),
373 log_path: log_path.clone(),
374 index_path,
375 next_seq,
376 completeness: LogCompleteness::Full,
377 global_index_dir: None,
378 })),
379 };
380
381 writer.upsert_index()?;
382 Ok(writer)
383 }
384
385 pub fn set_global_index_dir(&self, dir: PathBuf) -> Result<()> {
388 let mut state = self
389 .state
390 .lock()
391 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
392 state.global_index_dir = Some(dir);
393 Ok(())
394 }
395
396 pub fn log_path(&self) -> Result<PathBuf> {
397 let state = self
398 .state
399 .lock()
400 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
401 Ok(state.log_path.clone())
402 }
403
404 pub fn get_provider_session_id(&self) -> Option<String> {
405 self.state.lock().ok()?.metadata.provider_session_id.clone()
406 }
407
408 pub fn set_provider_session_id(&self, provider_session_id: Option<String>) -> Result<()> {
409 let mut state = self
410 .state
411 .lock()
412 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
413 state.metadata.provider_session_id = provider_session_id;
414 drop(state);
415 self.upsert_index()
416 }
417
418 pub fn set_completeness(&self, completeness: LogCompleteness) -> Result<()> {
419 let mut state = self
420 .state
421 .lock()
422 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
423 if rank_completeness(completeness) < rank_completeness(state.completeness) {
424 state.completeness = completeness;
425 }
426 drop(state);
427 self.upsert_index()
428 }
429
430 pub fn add_source_path(&self, path: impl Into<String>) -> Result<()> {
431 let path = path.into();
432 let state = self
433 .state
434 .lock()
435 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
436 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
437 let index_path = state.index_path.clone();
438 drop(state);
439
440 let mut index = load_index(&index_path)?;
441 if let Some(entry) = index
442 .sessions
443 .iter_mut()
444 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
445 && !entry.source_paths.contains(&path)
446 {
447 entry.source_paths.push(path);
448 save_index(&index_path, &index)?;
449 }
450 Ok(())
451 }
452
453 pub fn emit(&self, source_kind: LogSourceKind, kind: LogEventKind) -> Result<()> {
454 let mut state = self
455 .state
456 .lock()
457 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
458 let event = AgentLogEvent {
459 seq: state.next_seq,
460 ts: Utc::now().to_rfc3339(),
461 provider: state.metadata.provider.clone(),
462 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
463 provider_session_id: state.metadata.provider_session_id.clone(),
464 source_kind,
465 completeness: state.completeness,
466 kind,
467 };
468 state.next_seq += 1;
469
470 let mut file = OpenOptions::new()
471 .append(true)
472 .open(&state.log_path)
473 .with_context(|| format!("Failed to open {}", state.log_path.display()))?;
474 writeln!(file, "{}", serde_json::to_string(&event)?)
475 .with_context(|| format!("Failed to write {}", state.log_path.display()))?;
476 Ok(())
477 }
478
479 pub fn finish(&self, success: bool, error: Option<String>) -> Result<()> {
480 self.emit(
481 LogSourceKind::Wrapper,
482 LogEventKind::SessionEnded { success, error },
483 )?;
484 let state = self
485 .state
486 .lock()
487 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
488 let index_path = state.index_path.clone();
489 let wrapper_session_id = state.metadata.wrapper_session_id.clone();
490 drop(state);
491 let mut index = load_index(&index_path)?;
492 if let Some(entry) = index
493 .sessions
494 .iter_mut()
495 .find(|entry| entry.wrapper_session_id == wrapper_session_id)
496 {
497 entry.ended_at = Some(Utc::now().to_rfc3339());
498 }
499 save_index(&index_path, &index)
500 }
501
502 fn upsert_index(&self) -> Result<()> {
503 let state = self
504 .state
505 .lock()
506 .map_err(|_| anyhow::anyhow!("Log mutex poisoned"))?;
507 let mut index = load_index(&state.index_path)?;
508 let started_at;
509 let existing = index
510 .sessions
511 .iter_mut()
512 .find(|entry| entry.wrapper_session_id == state.metadata.wrapper_session_id);
513 if let Some(entry) = existing {
514 entry.provider_session_id = state.metadata.provider_session_id.clone();
515 entry.log_path = state.log_path.to_string_lossy().to_string();
516 entry.workspace_path = state.metadata.workspace_path.clone();
517 entry.command = Some(state.metadata.command.clone());
518 entry.completeness = state.completeness;
519 entry.backfilled = state.metadata.backfilled;
520 started_at = entry.started_at.clone();
521 } else {
522 started_at = Utc::now().to_rfc3339();
523 index.sessions.push(SessionLogIndexEntry {
524 wrapper_session_id: state.metadata.wrapper_session_id.clone(),
525 provider: state.metadata.provider.clone(),
526 provider_session_id: state.metadata.provider_session_id.clone(),
527 log_path: state.log_path.to_string_lossy().to_string(),
528 completeness: state.completeness,
529 started_at: started_at.clone(),
530 ended_at: None,
531 workspace_path: state.metadata.workspace_path.clone(),
532 command: Some(state.metadata.command.clone()),
533 source_paths: Vec::new(),
534 backfilled: state.metadata.backfilled,
535 });
536 }
537 save_index(&state.index_path, &index)?;
538
539 if let Some(ref global_dir) = state.global_index_dir {
541 let project = state
543 .index_path
544 .parent()
545 .and_then(|logs| logs.parent())
546 .and_then(|proj| proj.file_name())
547 .map(|n| n.to_string_lossy().to_string())
548 .unwrap_or_default();
549 let _ = upsert_global_entry(
550 global_dir,
551 GlobalSessionEntry {
552 session_id: state.metadata.wrapper_session_id.clone(),
553 project,
554 log_path: state.log_path.to_string_lossy().to_string(),
555 provider: state.metadata.provider.clone(),
556 started_at,
557 },
558 );
559 }
560
561 Ok(())
562 }
563}
564
565impl SessionLogCoordinator {
566 pub fn start(
570 logs_dir: &Path,
571 metadata: SessionLogMetadata,
572 live_adapter: Option<Box<dyn LiveLogAdapter>>,
573 ) -> Result<Self> {
574 let writer = SessionLogWriter::create(logs_dir, metadata.clone())?;
575 writer.emit(
576 if metadata.backfilled {
577 LogSourceKind::Backfill
578 } else {
579 LogSourceKind::Wrapper
580 },
581 LogEventKind::SessionStarted {
582 command: metadata.command.clone(),
583 model: metadata.model.clone(),
584 cwd: metadata.workspace_path.clone(),
585 resumed: metadata.resumed,
586 backfilled: metadata.backfilled,
587 },
588 )?;
589
590 if let Some(adapter) = live_adapter {
591 let (stop_tx, stop_rx) = watch::channel(false);
592 let writer_clone = writer.clone();
593 let task =
594 tokio::spawn(async move { run_live_adapter(adapter, writer_clone, stop_rx).await });
595 Ok(Self {
596 writer,
597 stop_tx: Some(stop_tx),
598 task: Some(task),
599 })
600 } else {
601 let (stop_tx, stop_rx) = watch::channel(false);
603 let writer_clone = writer.clone();
604 let task = tokio::spawn(async move { run_heartbeat_loop(writer_clone, stop_rx).await });
605 Ok(Self {
606 writer,
607 stop_tx: Some(stop_tx),
608 task: Some(task),
609 })
610 }
611 }
612
613 pub fn writer(&self) -> &SessionLogWriter {
614 &self.writer
615 }
616
617 pub async fn finish(mut self, success: bool, error: Option<String>) -> Result<()> {
618 if let Some(stop_tx) = self.stop_tx.take() {
619 let _ = stop_tx.send(true);
620 }
621 if let Some(task) = self.task.take() {
622 task.await??;
623 }
624 self.writer.finish(success, error)
625 }
626}
627
628pub fn record_prompt(writer: &SessionLogWriter, prompt: Option<&str>) -> Result<()> {
629 if let Some(prompt) = prompt
630 && !prompt.trim().is_empty()
631 {
632 writer.emit(
633 LogSourceKind::Wrapper,
634 LogEventKind::UserMessage {
635 role: "user".to_string(),
636 content: prompt.to_string(),
637 message_id: None,
638 },
639 )?;
640 }
641 Ok(())
642}
643
644pub fn record_agent_output(writer: &SessionLogWriter, output: &AgentOutput) -> Result<()> {
645 if !output.session_id.is_empty() && output.session_id != "unknown" {
646 writer.set_provider_session_id(Some(output.session_id.clone()))?;
647 }
648 for event in &output.events {
649 match event {
650 Event::AssistantMessage { content, .. } => {
651 for block in content {
652 match block {
653 ContentBlock::Text { text } => {
654 writer.emit(
655 LogSourceKind::Wrapper,
656 LogEventKind::AssistantMessage {
657 content: text.clone(),
658 message_id: None,
659 },
660 )?;
661 }
662 ContentBlock::ToolUse { id, name, input } => {
663 writer.emit(
664 LogSourceKind::Wrapper,
665 LogEventKind::ToolCall {
666 tool_kind: Some(ToolKind::infer(name)),
667 tool_name: name.clone(),
668 tool_id: Some(id.clone()),
669 input: Some(input.clone()),
670 },
671 )?;
672 }
673 }
674 }
675 }
676 Event::ToolExecution {
677 tool_name,
678 tool_id,
679 result,
680 ..
681 } => {
682 writer.emit(
683 LogSourceKind::Wrapper,
684 LogEventKind::ToolResult {
685 tool_kind: Some(ToolKind::infer(tool_name)),
686 tool_name: Some(tool_name.clone()),
687 tool_id: Some(tool_id.clone()),
688 success: Some(result.success),
689 output: result.output.clone(),
690 error: result.error.clone(),
691 data: result.data.clone(),
692 },
693 )?;
694 }
695 Event::PermissionRequest {
696 tool_name,
697 description,
698 granted,
699 } => {
700 writer.emit(
701 LogSourceKind::Wrapper,
702 LogEventKind::Permission {
703 tool_name: tool_name.clone(),
704 description: description.clone(),
705 granted: *granted,
706 },
707 )?;
708 }
709 Event::Error { message, details } => {
710 writer.emit(
711 LogSourceKind::Wrapper,
712 LogEventKind::ProviderStatus {
713 message: message.clone(),
714 data: details.clone(),
715 },
716 )?;
717 }
718 Event::Init {
719 model,
720 working_directory,
721 metadata,
722 ..
723 } => {
724 writer.emit(
725 LogSourceKind::Wrapper,
726 LogEventKind::ProviderStatus {
727 message: format!("Initialized {}", model),
728 data: Some(serde_json::json!({
729 "working_directory": working_directory,
730 "metadata": metadata,
731 })),
732 },
733 )?;
734 }
735 Event::UserMessage { content } => {
736 for block in content {
737 if let ContentBlock::Text { text } = block {
738 writer.emit(
739 LogSourceKind::Wrapper,
740 LogEventKind::UserMessage {
741 role: "user".to_string(),
742 content: text.clone(),
743 message_id: None,
744 },
745 )?;
746 }
747 }
748 }
749 Event::Result {
750 success,
751 message,
752 duration_ms,
753 num_turns,
754 } => {
755 writer.emit(
756 LogSourceKind::Wrapper,
757 LogEventKind::ProviderStatus {
758 message: message
759 .clone()
760 .unwrap_or_else(|| "Result emitted".to_string()),
761 data: Some(serde_json::json!({
762 "success": success,
763 "duration_ms": duration_ms,
764 "num_turns": num_turns,
765 })),
766 },
767 )?;
768 }
769 Event::TurnComplete {
770 stop_reason,
771 turn_index,
772 usage,
773 } => {
774 writer.emit(
775 LogSourceKind::Wrapper,
776 LogEventKind::ProviderStatus {
777 message: format!("Turn {} complete", turn_index),
778 data: Some(serde_json::json!({
779 "stop_reason": stop_reason,
780 "turn_index": turn_index,
781 "usage": usage,
782 })),
783 },
784 )?;
785 }
786 }
787 }
788
789 if let Some(ref usage) = output.usage {
791 writer.emit(
792 LogSourceKind::Wrapper,
793 LogEventKind::Usage {
794 input_tokens: usage.input_tokens,
795 output_tokens: usage.output_tokens,
796 cache_read_tokens: usage.cache_read_tokens,
797 cache_creation_tokens: usage.cache_creation_tokens,
798 total_cost_usd: output.total_cost_usd,
799 },
800 )?;
801 } else if let Some(cost) = output.total_cost_usd {
802 writer.emit(
804 LogSourceKind::Wrapper,
805 LogEventKind::Usage {
806 input_tokens: 0,
807 output_tokens: 0,
808 cache_read_tokens: None,
809 cache_creation_tokens: None,
810 total_cost_usd: Some(cost),
811 },
812 )?;
813 }
814
815 Ok(())
816}
817
818pub fn run_backfill(
822 logs_dir: &Path,
823 root: Option<&str>,
824 providers: &[&dyn HistoricalLogAdapter],
825) -> Result<usize> {
826 let state_path = logs_dir.join("backfill_state.json");
827 let mut state = load_backfill_state(&state_path)?;
828 let current_version = 1;
829 if state.version == current_version {
830 info!(
831 "Historical log import already completed for version {}",
832 current_version
833 );
834 return Ok(0);
835 }
836
837 info!("Starting historical log import");
838 let mut imported = 0;
839 for provider in providers {
840 for session in provider.backfill(root)? {
841 let key = session_key(&session.metadata);
842 if state.imported_session_keys.contains(&key) {
843 info!(
844 "Skipping already imported historical session: {} {}",
845 session.metadata.provider,
846 session
847 .metadata
848 .provider_session_id
849 .as_deref()
850 .unwrap_or(&session.metadata.wrapper_session_id)
851 );
852 continue;
853 }
854
855 info!(
856 "Importing historical session: {} {}",
857 session.metadata.provider,
858 session
859 .metadata
860 .provider_session_id
861 .as_deref()
862 .unwrap_or(&session.metadata.wrapper_session_id)
863 );
864
865 let writer = SessionLogWriter::create(logs_dir, session.metadata.clone())?;
866 writer.set_completeness(session.completeness)?;
867 for source_path in session.source_paths {
868 info!(" source: {}", source_path);
869 let _ = writer.add_source_path(source_path);
870 }
871 for (source_kind, event) in session.events {
872 writer.emit(source_kind, event)?;
873 }
874 writer.finish(true, None)?;
875 state.imported_session_keys.push(key);
876 imported += 1;
877 }
878 }
879
880 state.version = current_version;
881 save_backfill_state(&state_path, &state)?;
882 info!(
883 "Historical log import finished: {} session(s) imported",
884 imported
885 );
886 Ok(imported)
887}
888
889const HEARTBEAT_INTERVAL_SECS: u64 = 10;
891
892async fn run_live_adapter(
893 mut adapter: Box<dyn LiveLogAdapter>,
894 writer: SessionLogWriter,
895 mut stop_rx: watch::Receiver<bool>,
896) -> Result<()> {
897 let mut last_heartbeat = tokio::time::Instant::now();
898 loop {
899 adapter.poll(&writer).await?;
900
901 if last_heartbeat.elapsed().as_secs() >= HEARTBEAT_INTERVAL_SECS {
903 let _ = writer.emit(
904 LogSourceKind::Wrapper,
905 LogEventKind::Heartbeat {
906 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
907 },
908 );
909 last_heartbeat = tokio::time::Instant::now();
910 }
911
912 tokio::select! {
913 changed = stop_rx.changed() => {
914 if changed.is_ok() && *stop_rx.borrow() {
915 break;
916 }
917 }
918 _ = tokio::time::sleep(std::time::Duration::from_millis(250)) => {}
919 }
920 }
921 adapter.finalize(&writer).await
922}
923
924async fn run_heartbeat_loop(
926 writer: SessionLogWriter,
927 mut stop_rx: watch::Receiver<bool>,
928) -> Result<()> {
929 loop {
930 tokio::select! {
931 changed = stop_rx.changed() => {
932 if changed.is_ok() && *stop_rx.borrow() {
933 break;
934 }
935 }
936 _ = tokio::time::sleep(std::time::Duration::from_secs(HEARTBEAT_INTERVAL_SECS)) => {
937 let _ = writer.emit(
938 LogSourceKind::Wrapper,
939 LogEventKind::Heartbeat {
940 interval_secs: Some(HEARTBEAT_INTERVAL_SECS),
941 },
942 );
943 }
944 }
945 }
946 Ok(())
947}
948
949fn next_sequence(path: &Path) -> Result<u64> {
950 if !path.exists() {
951 return Ok(1);
952 }
953 let file = File::open(path).with_context(|| format!("Failed to open {}", path.display()))?;
954 let reader = BufReader::new(file);
955 let mut last_seq = 0;
956 for line in reader.lines() {
957 let line = line?;
958 if line.trim().is_empty() {
959 continue;
960 }
961 if let Ok(value) = serde_json::from_str::<Value>(&line)
962 && let Some(seq) = value.get("seq").and_then(|seq| seq.as_u64())
963 {
964 last_seq = seq;
965 }
966 }
967 Ok(last_seq + 1)
968}
969
970fn load_index(path: &Path) -> Result<SessionLogIndex> {
971 if !path.exists() {
972 return Ok(SessionLogIndex::default());
973 }
974 let content = std::fs::read_to_string(path)
975 .with_context(|| format!("Failed to read {}", path.display()))?;
976 Ok(serde_json::from_str(&content).unwrap_or_default())
977}
978
979fn save_index(path: &Path, index: &SessionLogIndex) -> Result<()> {
980 let content = serde_json::to_string_pretty(index)?;
981 crate::file_util::atomic_write_str(path, &content)
982 .with_context(|| format!("Failed to write {}", path.display()))
983}
984
985fn load_backfill_state(path: &Path) -> Result<BackfillState> {
986 if !path.exists() {
987 return Ok(BackfillState::default());
988 }
989 let content = std::fs::read_to_string(path)
990 .with_context(|| format!("Failed to read {}", path.display()))?;
991 Ok(serde_json::from_str(&content).unwrap_or_default())
992}
993
994fn save_backfill_state(path: &Path, state: &BackfillState) -> Result<()> {
995 let content = serde_json::to_string_pretty(state)?;
996 crate::file_util::atomic_write_str(path, &content)
997 .with_context(|| format!("Failed to write {}", path.display()))
998}
999
1000fn rank_completeness(completeness: LogCompleteness) -> u8 {
1001 match completeness {
1002 LogCompleteness::Full => 3,
1003 LogCompleteness::Partial => 2,
1004 LogCompleteness::MetadataOnly => 1,
1005 }
1006}
1007
1008fn session_key(metadata: &SessionLogMetadata) -> String {
1009 format!(
1010 "{}:{}",
1011 metadata.provider,
1012 metadata
1013 .provider_session_id
1014 .as_deref()
1015 .unwrap_or(&metadata.wrapper_session_id)
1016 )
1017}
1018
1019#[cfg(test)]
1020#[path = "session_log_tests.rs"]
1021mod tests;