use crate::error::Result;
use crate::memvid::lifecycle::Memvid;
use crate::replay::{
ActionType, ActiveSession, ReplayAction, ReplayConfig, ReplayManifest, ReplaySession,
SessionSummary, StateSnapshot,
};
use uuid::Uuid;
impl Memvid {
#[cfg(feature = "replay")]
pub fn start_session(
&mut self,
name: Option<String>,
config: Option<ReplayConfig>,
) -> Result<Uuid> {
if self.active_session.is_some() {
return Err(crate::MemvidError::InvalidQuery {
reason: "A session is already active. End it before starting a new one.".into(),
});
}
let cfg = config.unwrap_or_default();
let session = ActiveSession::new(name, cfg);
let session_id = session.session_id();
self.active_session = Some(session);
tracing::info!("Started replay session: {}", session_id);
Ok(session_id)
}
#[cfg(feature = "replay")]
pub fn end_session(&mut self) -> Result<ReplaySession> {
let session =
self.active_session
.take()
.ok_or_else(|| crate::MemvidError::InvalidQuery {
reason: "No active session to end".into(),
})?;
let completed = session.end();
let session_id = completed.session_id;
self.completed_sessions.push(completed.clone());
tracing::info!("Ended replay session: {}", session_id);
Ok(completed)
}
#[cfg(feature = "replay")]
pub fn active_session_id(&self) -> Option<Uuid> {
self.active_session.as_ref().map(|s| s.session_id())
}
#[cfg(feature = "replay")]
pub fn is_recording(&self) -> bool {
self.active_session.is_some()
}
#[cfg(feature = "replay")]
pub fn create_checkpoint(&mut self) -> Result<u64> {
let session =
self.active_session
.as_mut()
.ok_or_else(|| crate::MemvidError::InvalidQuery {
reason: "No active session for checkpoint".into(),
})?;
let snapshot = StateSnapshot {
frame_count: self.toc.frames.len(),
frame_ids: self.toc.frames.iter().map(|f| f.id).collect(),
lex_index_hash: self.toc.indexes.lex.as_ref().map(|m| m.checksum),
vec_index_hash: self.toc.indexes.vec.as_ref().map(|m| m.checksum),
wal_sequence: self.header.wal_sequence,
generation: self.generation,
};
let checkpoint = session.create_checkpoint(snapshot);
let checkpoint_id = checkpoint.id;
session.record_action(ReplayAction::new(
session.session.next_sequence(),
ActionType::Checkpoint {
checkpoint_id: checkpoint.id,
},
));
tracing::debug!("Created checkpoint {} in session", checkpoint_id);
Ok(checkpoint_id)
}
#[cfg(feature = "replay")]
pub fn record_put_action(&mut self, frame_id: u64, input: &[u8]) {
if let Some(session) = self.active_session.as_mut() {
let action = ReplayAction::new(
session.session.next_sequence(),
ActionType::Put { frame_id },
)
.with_input(input)
.with_affected_frames(vec![frame_id]);
session.record_action(action);
if session.should_checkpoint() {
let _ = self.create_checkpoint();
}
}
}
#[cfg(feature = "replay")]
pub fn record_find_action(
&mut self,
query: &str,
mode: &str,
result_count: usize,
result_frames: Vec<u64>,
) {
if let Some(session) = self.active_session.as_mut() {
let action = ReplayAction::new(
session.session.next_sequence(),
ActionType::Find {
query: query.to_string(),
mode: mode.to_string(),
result_count,
},
)
.with_input(query.as_bytes())
.with_affected_frames(result_frames);
session.record_action(action);
}
}
#[cfg(feature = "replay")]
pub fn record_ask_action(
&mut self,
query: &str,
provider: &str,
model: &str,
response: &[u8],
duration_ms: u64,
retrieved_frames: Vec<u64>,
) {
if let Some(session) = self.active_session.as_mut() {
let action = ReplayAction::new(
session.session.next_sequence(),
ActionType::Ask {
query: query.to_string(),
provider: provider.to_string(),
model: model.to_string(),
},
)
.with_input(query.as_bytes())
.with_output(response)
.with_duration_ms(duration_ms)
.with_affected_frames(retrieved_frames);
session.record_action(action);
}
}
#[cfg(feature = "replay")]
pub fn list_sessions(&self) -> Vec<SessionSummary> {
self.completed_sessions
.iter()
.map(SessionSummary::from)
.collect()
}
#[cfg(feature = "replay")]
pub fn get_session(&self, session_id: Uuid) -> Option<&ReplaySession> {
self.completed_sessions
.iter()
.find(|s| s.session_id == session_id)
}
#[cfg(feature = "replay")]
pub fn delete_session(&mut self, session_id: Uuid) -> Result<()> {
let pos = self
.completed_sessions
.iter()
.position(|s| s.session_id == session_id)
.ok_or_else(|| crate::MemvidError::InvalidQuery {
reason: format!("Session {} not found", session_id).into(),
})?;
self.completed_sessions.remove(pos);
tracing::info!("Deleted session {}", session_id);
Ok(())
}
#[cfg(feature = "replay")]
pub fn save_replay_sessions(&mut self) -> Result<()> {
use crate::replay::storage;
use std::io::{Seek, SeekFrom, Write};
if self.completed_sessions.is_empty() {
if self.toc.replay_manifest.is_some() {
self.toc.replay_manifest = None;
self.dirty = true;
tracing::info!("Cleared replay manifest (no sessions remaining)");
}
return Ok(());
}
let segment_data = storage::build_segment(&self.completed_sessions)?;
let segment_size = segment_data.len() as u64;
let preview_len = segment_data.len().min(32);
let hex_preview: Vec<String> = segment_data[..preview_len]
.iter()
.map(|b| format!("{:02x}", b))
.collect();
tracing::debug!(
"Writing segment with first {} bytes: {}",
preview_len,
hex_preview.join(" ")
);
tracing::debug!(
"Segment magic should be: {:?}",
String::from_utf8_lossy(&segment_data[..8.min(segment_data.len())])
);
let segment_offset = self.header.footer_offset.max(self.data_end);
tracing::debug!(
"Writing replay segment: offset={}, size={}, footer_offset={}, data_end={}",
segment_offset,
segment_size,
self.header.footer_offset,
self.data_end
);
self.file.seek(SeekFrom::Start(segment_offset))?;
self.file.write_all(&segment_data)?;
self.file.sync_all()?;
self.toc.replay_manifest = Some(ReplayManifest {
segment_offset,
segment_size,
session_count: self.completed_sessions.len() as u32,
total_actions: self
.completed_sessions
.iter()
.map(|s| s.actions.len() as u64)
.sum(),
version: crate::replay::REPLAY_SEGMENT_VERSION,
});
let new_end = segment_offset + segment_size;
self.data_end = self.data_end.max(new_end);
self.header.footer_offset = self.header.footer_offset.max(new_end);
self.dirty = true;
tracing::info!(
"Saved {} replay sessions ({} bytes) at offset {}",
self.completed_sessions.len(),
segment_size,
segment_offset
);
Ok(())
}
#[cfg(feature = "replay")]
pub fn load_replay_sessions(&mut self) -> Result<()> {
use crate::replay::storage;
use std::io::{Read, Seek, SeekFrom};
let manifest = match &self.toc.replay_manifest {
Some(m) if m.session_count > 0 => {
tracing::debug!(
"Found replay manifest: session_count={}, segment_offset={}, segment_size={}",
m.session_count,
m.segment_offset,
m.segment_size
);
m.clone()
}
Some(_) => {
tracing::debug!("Replay manifest has 0 sessions, skipping load");
return Ok(());
}
_ => {
tracing::debug!("No replay manifest in TOC, skipping load");
return Ok(());
}
};
tracing::debug!("Allocating buffer of {} bytes", manifest.segment_size);
let mut buf = vec![0u8; manifest.segment_size as usize];
tracing::debug!("Seeking to offset {}", manifest.segment_offset);
self.file
.seek(SeekFrom::Start(manifest.segment_offset))
.map_err(|e| {
tracing::error!(
"Failed to seek to replay segment at offset {}: {}",
manifest.segment_offset,
e
);
e
})?;
tracing::debug!("Reading {} bytes from file", manifest.segment_size);
self.file.read_exact(&mut buf).map_err(|e| {
tracing::error!(
"Failed to read replay segment ({} bytes): {}",
manifest.segment_size,
e
);
e
})?;
let preview_len = buf.len().min(32);
let hex_preview: Vec<String> = buf[..preview_len]
.iter()
.map(|b| format!("{:02x}", b))
.collect();
tracing::debug!(
"First {} bytes of segment: {}",
preview_len,
hex_preview.join(" ")
);
let expected_magic = b"MV2RPLY!";
let actual_magic = &buf[..8.min(buf.len())];
tracing::debug!(
"Expected magic: {:?}, Actual magic: {:?} (as string: {:?})",
expected_magic,
actual_magic,
String::from_utf8_lossy(actual_magic)
);
tracing::debug!("Parsing replay segment data");
self.completed_sessions = storage::read_segment(&buf).map_err(|e| {
tracing::error!("Failed to parse replay segment: {}", e);
e
})?;
tracing::info!(
"Loaded {} replay sessions from file",
self.completed_sessions.len()
);
Ok(())
}
#[cfg(feature = "replay")]
fn active_session_path(&self) -> std::path::PathBuf {
let mut path = self.path.clone();
let mut filename = path.file_name().unwrap_or_default().to_os_string();
filename.push(".session");
path.set_file_name(filename);
path
}
#[cfg(feature = "replay")]
pub fn save_active_session(&self) -> Result<()> {
use crate::replay::storage;
use std::io::Write;
let session = match &self.active_session {
Some(s) => s,
None => {
let path = self.active_session_path();
if path.exists() {
let _ = std::fs::remove_file(&path);
}
return Ok(());
}
};
let data = storage::serialize_active_session(session)?;
let path = self.active_session_path();
let mut file = std::fs::File::create(&path)?;
file.write_all(&data)?;
file.sync_all()?;
tracing::debug!("Saved active session to {:?}", path);
Ok(())
}
#[cfg(feature = "replay")]
pub fn load_active_session(&mut self) -> Result<bool> {
use crate::replay::storage;
let path = self.active_session_path();
if !path.exists() {
return Ok(false);
}
let data = std::fs::read(&path)?;
match storage::deserialize_active_session(&data) {
Ok(session) => {
tracing::info!(
"Loaded active session {} from {:?}",
session.session_id(),
path
);
self.active_session = Some(session);
Ok(true)
}
Err(e) => {
tracing::warn!("Failed to load active session: {}, removing stale file", e);
let _ = std::fs::remove_file(&path);
Ok(false)
}
}
}
#[cfg(feature = "replay")]
pub fn clear_active_session_file(&self) -> Result<()> {
let path = self.active_session_path();
if path.exists() {
std::fs::remove_file(&path)?;
}
Ok(())
}
}