use std::path::{Path, PathBuf};
use anyhow::Result;
use tokio::fs;
use super::header::SessionHeader;
use super::tail_load::TailLoad;
use super::tail_seed::with_tail_cap;
use super::types::Session;
impl Session {
pub async fn load(id: &str) -> Result<Self> {
let path = Self::session_path(id)?;
let content = fs::read_to_string(&path).await?;
let session: Session = serde_json::from_str(&content)?;
Ok(session)
}
pub async fn last_for_directory(workspace: Option<&std::path::Path>) -> Result<Self> {
Self::last_for_directory_tail(workspace, usize::MAX)
.await
.map(|t| t.session)
}
pub async fn last_for_directory_tail(
workspace: Option<&std::path::Path>,
window: usize,
) -> Result<TailLoad> {
let sessions_dir = Self::sessions_dir()?;
let canonical_workspace =
workspace.map(|w| w.canonicalize().unwrap_or_else(|_| w.to_path_buf()));
tokio::task::spawn_blocking(move || {
scan_with_index(&sessions_dir, canonical_workspace, window)
})
.await
.map_err(|e| anyhow::anyhow!("session scan task panicked: {e}"))?
}
pub async fn last() -> Result<Self> {
Self::last_for_directory(None).await
}
pub async fn save(&self) -> Result<()> {
let path = Self::session_path(&self.id)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let tmp = path.with_extension("json.tmp");
let snapshot = self.clone();
let content = tokio::task::spawn_blocking(move || serde_json::to_vec(&snapshot))
.await
.map_err(|e| anyhow::anyhow!("session serialize task panicked: {e}"))??;
fs::write(&tmp, content).await?;
if let Err(primary) = fs::rename(&tmp, &path).await {
let _ = fs::remove_file(&path).await;
if let Err(retry) = fs::rename(&tmp, &path).await {
let _ = fs::remove_file(&tmp).await;
return Err(anyhow::anyhow!(
"session rename failed: {primary} (retry: {retry})"
));
}
}
if let Some(dir) = &self.metadata.directory {
let canonical = dir.canonicalize().unwrap_or_else(|_| dir.clone());
let session_id = self.id.clone();
tokio::task::spawn_blocking(move || {
if let Err(err) = super::workspace_index_io::upsert_sync(&canonical, &session_id) {
tracing::debug!(%err, "workspace index upsert failed (non-fatal)");
}
});
}
Ok(())
}
pub async fn delete(id: &str) -> Result<()> {
let path = Self::session_path(id)?;
if path.exists() {
tokio::fs::remove_file(&path).await?;
}
Ok(())
}
pub(crate) fn sessions_dir() -> Result<PathBuf> {
use std::sync::OnceLock;
static CACHED: OnceLock<PathBuf> = OnceLock::new();
if let Some(dir) = CACHED.get() {
return Ok(dir.clone());
}
let dir = crate::config::Config::data_dir()
.map(|d| d.join("sessions"))
.ok_or_else(|| anyhow::anyhow!("Could not determine data directory"))?;
let _ = CACHED.set(dir.clone());
Ok(dir)
}
pub(crate) fn session_path(id: &str) -> Result<PathBuf> {
Ok(Self::sessions_dir()?.join(format!("{}.json", id)))
}
}
fn scan_with_index(
sessions_dir: &Path,
canonical_workspace: Option<PathBuf>,
window: usize,
) -> Result<TailLoad> {
if let Some(ws) = canonical_workspace.as_ref() {
let index = super::workspace_index::WorkspaceIndex::load_sync();
if let Some(id) = index.get(ws) {
let candidate = sessions_dir.join(format!("{id}.json"));
if candidate.exists()
&& let Ok(load) = tail_load_sync(&candidate, window)
{
let dir_ok = load
.session
.metadata
.directory
.as_ref()
.map(|d| {
let canonical = d.canonicalize().unwrap_or_else(|_| d.clone());
&canonical == ws
})
.unwrap_or(false);
if dir_ok {
return Ok(load);
}
}
}
}
let result = scan_sync(sessions_dir, canonical_workspace.clone(), window);
if let (Ok(load), Some(ws)) = (&result, canonical_workspace.as_ref()) {
let _ = super::workspace_index_io::upsert_sync(ws, &load.session.id);
}
result
}
fn tail_load_sync(path: &Path, window: usize) -> Result<TailLoad> {
use std::fs;
use std::io::BufReader;
let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
let file = fs::File::open(path)?;
let reader = BufReader::with_capacity(64 * 1024, file);
let (parsed, dropped) = with_tail_cap(window, || {
serde_json::from_reader::<_, Session>(reader)
});
Ok(TailLoad {
session: parsed?,
dropped,
file_bytes,
})
}
fn scan_sync(
sessions_dir: &Path,
canonical_workspace: Option<PathBuf>,
window: usize,
) -> Result<TailLoad> {
use std::fs;
use std::io::BufReader;
use std::time::SystemTime;
if !sessions_dir.exists() {
anyhow::bail!("No sessions found");
}
let mut candidates: Vec<(PathBuf, SystemTime)> = Vec::new();
for entry in fs::read_dir(sessions_dir)? {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let mtime = entry
.metadata()
.ok()
.and_then(|m| m.modified().ok())
.unwrap_or(SystemTime::UNIX_EPOCH);
candidates.push((path, mtime));
}
if candidates.is_empty() {
anyhow::bail!("No sessions found");
}
candidates.sort_by(|a, b| b.1.cmp(&a.1));
let needle: Option<Vec<u8>> = canonical_workspace.as_ref().map(|ws| {
let quoted = serde_json::to_string(&ws.to_string_lossy()).unwrap_or_default();
let inner = quoted
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or("ed);
inner.as_bytes().to_vec()
});
let finder = needle
.as_ref()
.map(|n| memchr::memmem::Finder::new(n.as_slice()).into_owned());
let prefilter_hits: Vec<bool> = match (finder.as_ref(), candidates.len()) {
(None, _) => vec![true; candidates.len()],
(Some(_), 0..=1) => vec![true; candidates.len()], (Some(finder), _) => {
let paths: Vec<&Path> = candidates.iter().map(|(p, _)| p.as_path()).collect();
let results: std::sync::Mutex<Vec<Option<bool>>> =
std::sync::Mutex::new(vec![None; paths.len()]);
std::thread::scope(|scope| {
let threads =
std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4).min(8);
let chunk_size = paths.len().div_ceil(threads);
for chunk_idx in 0..threads {
let start = chunk_idx * chunk_size;
if start >= paths.len() {
break;
}
let end = (start + chunk_size).min(paths.len());
let chunk_paths = &paths[start..end];
let results = &results;
scope.spawn(move || {
for (offset, p) in chunk_paths.iter().enumerate() {
let hit = file_contains_finder(p, finder).unwrap_or(false);
if let Ok(mut guard) = results.lock() {
guard[start + offset] = Some(hit);
}
}
});
}
});
results
.into_inner()
.unwrap_or_default()
.into_iter()
.map(|o| o.unwrap_or(false))
.collect()
}
};
for (idx, (path, _)) in candidates.iter().enumerate() {
if !prefilter_hits.get(idx).copied().unwrap_or(false) {
continue;
}
let header_ok = (|| -> Result<bool> {
let file = fs::File::open(path)?;
let reader = BufReader::with_capacity(16 * 1024, file);
let header: SessionHeader = match serde_json::from_reader(reader) {
Ok(h) => h,
Err(_) => return Ok(false),
};
if let Some(ref ws) = canonical_workspace {
let Some(dir) = header.metadata.directory.as_ref() else {
return Ok(false);
};
if dir == ws {
return Ok(true);
}
let canonical_dir = dir.canonicalize().unwrap_or_else(|_| dir.clone());
Ok(&canonical_dir == ws)
} else {
Ok(true)
}
})();
match header_ok {
Ok(true) => {}
Ok(false) => continue,
Err(err) => {
tracing::warn!(
path = %path.display(),
error = %err,
"skipping unreadable session file",
);
continue;
}
}
let file_bytes = fs::metadata(path).map(|m| m.len()).unwrap_or(0);
let file = fs::File::open(path)?;
let reader = BufReader::with_capacity(64 * 1024, file);
let (parsed, dropped) =
with_tail_cap(window, || serde_json::from_reader::<_, Session>(reader));
return Ok(TailLoad {
session: parsed?,
dropped,
file_bytes,
});
}
anyhow::bail!("No sessions found")
}
fn file_contains_finder(path: &Path, finder: &memchr::memmem::Finder<'_>) -> Result<bool> {
use std::fs;
let needle_len = finder.needle().len();
if needle_len == 0 {
return Ok(true);
}
let file = fs::File::open(path)?;
let meta = file.metadata()?;
let len = meta.len();
if (len as usize) < needle_len {
return Ok(false);
}
let mmap = unsafe { memmap2::Mmap::map(&file)? };
Ok(finder.find(&mmap[..]).is_some())
}