use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use ahash::AHashMap;
use fs2::FileExt;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use crate::extract::{FileMapL1, FileMapL2, SCHEMA_VER};
use crate::hashing::{self, Hash};
use crate::index::{IndexDb, IndexError};
#[cfg(feature = "intelligence")]
use crate::lance::LanceStore;
use crate::path::RelPath;
pub const INDEX_FILE: &str = "index.msgpack";
pub const BLOBS_DIR: &str = "blobs";
pub const LOCK_FILE: &str = ".lock";
pub const LOCK_META_FILE: &str = ".lock.meta";
pub const VIEWS_DIR: &str = "views";
#[cfg(feature = "intelligence")]
pub const LANCE_DIR: &str = "lance";
pub const VIEW_WORKING: &str = "working";
pub const VIEW_STAGED: &str = "staged";
pub fn view_name_for_rev(short_sha: &str) -> String {
format!("rev-{short_sha}")
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LockHolder {
Serve,
Watch,
Scan,
Rescan,
Maintenance,
}
impl LockHolder {
pub fn command(self) -> &'static str {
match self {
LockHolder::Serve => "basemind serve",
LockHolder::Watch => "basemind watch",
LockHolder::Scan => "basemind scan",
LockHolder::Rescan => "basemind rescan",
LockHolder::Maintenance => "a basemind cache/maintenance task",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockMeta {
pub command: String,
pub pid: u32,
pub acquired_unix: i64,
}
#[derive(Debug, Error)]
pub enum StoreError {
#[error("io error on {path}: {source}")]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("msgpack encode error: {0}")]
Encode(#[from] rmp_serde::encode::Error),
#[error("msgpack decode error: {0}")]
Decode(#[from] rmp_serde::decode::Error),
#[error("schema version mismatch: stored {found}, current {expected}")]
SchemaMismatch { found: u16, expected: u16 },
#[error("{}", lock_contention_message(.path, .holder))]
Locked {
path: PathBuf,
holder: Option<LockMeta>,
},
#[error("inverted index error: {0}")]
Index(#[from] IndexError),
#[error(
"view {view:?} has not been scanned; run `basemind scan --view {view}` \
(or omit --view to use the working view)"
)]
ViewNotScanned { view: String },
}
impl StoreError {
pub fn is_lock_contention(&self) -> bool {
matches!(
self,
StoreError::Locked { .. } | StoreError::Index(IndexError::Fjall(fjall::Error::Locked))
)
}
}
fn lock_contention_message(path: &Path, holder: &Option<LockMeta>) -> String {
match holder {
Some(meta) => format!(
"another basemind process holds the lock on {} (`{}`, pid {})",
path.display(),
meta.command,
meta.pid
),
None => format!(
"another basemind process holds the lock on {} (usually the `basemind serve` MCP \
server from your editor plugin, or `basemind watch`)",
path.display()
),
}
}
pub const LOCK_CONTENTION_HELP: &str = "the basemind index is locked by another process \
(likely the MCP server). If an editor/plugin is serving this repo, use its `rescan` tool \
to refresh the index, or stop that server before running `basemind scan`.";
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Index {
pub schema_ver: u16,
pub files: AHashMap<RelPath, FileEntry>,
}
impl Index {
pub fn empty() -> Self {
Self {
schema_ver: SCHEMA_VER,
files: AHashMap::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct FileEntry {
pub hash_hex: String,
pub language: String,
pub size_bytes: u64,
pub mtime: i64,
}
pub struct Store {
pub root: PathBuf,
pub basemind_dir: PathBuf,
pub view_dir: PathBuf,
pub view: String,
pub index: Index,
pub index_db: Option<IndexDb>,
#[cfg(feature = "intelligence")]
pub lance: Option<LanceStore>,
_lock: Option<File>,
}
impl Store {
pub fn open(root: &Path, view: &str) -> Result<Self, StoreError> {
Self::open_with_holder(root, view, LockHolder::Maintenance)
}
pub fn open_with_holder(
root: &Path,
view: &str,
holder: LockHolder,
) -> Result<Self, StoreError> {
let basemind_dir = root.join(crate::config::BASEMIND_DIR);
ensure_dir(&basemind_dir)?;
ensure_gitignore(&basemind_dir)?;
ensure_dir(&basemind_dir.join(BLOBS_DIR))?;
ensure_dir(&basemind_dir.join(VIEWS_DIR))?;
migrate_legacy_index_into_views(&basemind_dir)?;
let view_dir = basemind_dir.join(VIEWS_DIR).join(view);
ensure_dir(&view_dir)?;
let lock = acquire_lock_as(&basemind_dir, holder)?;
let index = match read_index(&view_dir) {
Ok(Some(idx)) => idx,
Ok(None) => Index::empty(),
Err(StoreError::SchemaMismatch { found, expected }) => {
tracing::info!(
found,
expected,
view,
"cache schema bumped; refreshing view in place (re-extract + GC reclaims orphans)"
);
wipe_view(&view_dir)?;
Index::empty()
}
Err(e) => return Err(e),
};
let index_db = Some(IndexDb::open(&view_dir)?);
Ok(Self {
root: root.to_path_buf(),
basemind_dir,
view_dir,
view: view.to_string(),
index,
index_db,
#[cfg(feature = "intelligence")]
lance: None,
_lock: Some(lock),
})
}
pub fn open_read_only(root: &Path, view: &str) -> Result<Self, StoreError> {
let basemind_dir = root.join(crate::config::BASEMIND_DIR);
if basemind_dir.exists() {
let _ = migrate_legacy_index_into_views(&basemind_dir);
}
let view_dir = basemind_dir.join(VIEWS_DIR).join(view);
if view != VIEW_WORKING && !view_dir.join(INDEX_FILE).exists() {
return Err(StoreError::ViewNotScanned {
view: view.to_string(),
});
}
let (index, schema_ok) = match read_index(&view_dir) {
Ok(Some(idx)) => (idx, true),
Ok(None) => (Index::empty(), true),
Err(StoreError::SchemaMismatch { found, expected }) => {
tracing::warn!(
found,
expected,
"cache schema mismatch; index reads empty until `basemind scan` refreshes it"
);
(Index::empty(), false)
}
Err(e) => return Err(e),
};
let index_db = if schema_ok && view_dir.exists() {
IndexDb::open(&view_dir).ok()
} else {
None
};
Ok(Self {
root: root.to_path_buf(),
basemind_dir,
view_dir,
view: view.to_string(),
index,
index_db,
#[cfg(feature = "intelligence")]
lance: None,
_lock: None,
})
}
#[cfg(feature = "intelligence")]
pub fn lance_or_open(
&mut self,
dim: u16,
embedding_model: &str,
) -> Result<&LanceStore, anyhow::Error> {
if self.lance.is_none() {
let dir = self.basemind_dir.join(LANCE_DIR);
let store = LanceStore::open(&dir, dim, embedding_model)?;
self.lance = Some(store);
}
Ok(self.lance.as_ref().expect("lance store just populated"))
}
pub fn blob_path_l1(&self, hash: &Hash) -> PathBuf {
let buf = hashing::hex_buf(hash);
self.blob_path_l1_hex(hashing::hex_str(&buf))
}
pub fn blob_path_l2(&self, hash: &Hash) -> PathBuf {
let buf = hashing::hex_buf(hash);
self.blob_path_l2_hex(hashing::hex_str(&buf))
}
pub fn blob_path_l1_hex(&self, hash_hex: &str) -> PathBuf {
self.basemind_dir
.join(BLOBS_DIR)
.join(format!("{hash_hex}.l1.msgpack"))
}
pub fn blob_path_l2_hex(&self, hash_hex: &str) -> PathBuf {
self.basemind_dir
.join(BLOBS_DIR)
.join(format!("{hash_hex}.l2.msgpack"))
}
#[cfg(feature = "documents")]
pub fn blob_path_doc(&self, hash: &Hash) -> PathBuf {
let buf = hashing::hex_buf(hash);
self.blob_path_doc_hex(hashing::hex_str(&buf))
}
#[cfg(feature = "documents")]
pub fn blob_path_doc_hex(&self, hash_hex: &str) -> PathBuf {
self.basemind_dir
.join(BLOBS_DIR)
.join(format!("{hash_hex}.doc.msgpack"))
}
pub fn read_l1(&self, hash: &Hash) -> Result<Option<FileMapL1>, StoreError> {
let buf = hashing::hex_buf(hash);
self.read_l1_by_hex(hashing::hex_str(&buf))
}
pub fn read_l2(&self, hash: &Hash) -> Result<Option<FileMapL2>, StoreError> {
let buf = hashing::hex_buf(hash);
self.read_l2_by_hex(hashing::hex_str(&buf))
}
pub fn read_l1_by_hex(&self, hash_hex: &str) -> Result<Option<FileMapL1>, StoreError> {
let path = self.blob_path_l1_hex(hash_hex);
if !path.exists() {
return Ok(None);
}
let bytes = std::fs::read(&path).map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
let map: FileMapL1 = rmp_serde::from_slice(&bytes)?;
check_schema(map.schema_ver)?;
Ok(Some(map))
}
pub fn read_l2_by_hex(&self, hash_hex: &str) -> Result<Option<FileMapL2>, StoreError> {
let path = self.blob_path_l2_hex(hash_hex);
if !path.exists() {
return Ok(None);
}
let bytes = std::fs::read(&path).map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
let map: FileMapL2 = rmp_serde::from_slice(&bytes)?;
check_schema(map.schema_ver)?;
Ok(Some(map))
}
pub fn write_l1(&self, hash: &Hash, map: &FileMapL1) -> Result<(), StoreError> {
write_blob(self.blob_path_l1(hash), map)
}
pub fn write_l2(&self, hash: &Hash, map: &FileMapL2) -> Result<(), StoreError> {
write_blob(self.blob_path_l2(hash), map)
}
#[cfg(feature = "documents")]
pub fn write_doc(
&self,
hash: &Hash,
map: &crate::extract::doc::FileMapDoc,
) -> Result<(), StoreError> {
write_blob(self.blob_path_doc(hash), map)
}
#[cfg(feature = "documents")]
pub fn read_doc_by_hex(
&self,
hash_hex: &str,
) -> Result<Option<crate::extract::doc::FileMapDoc>, StoreError> {
let path = self.blob_path_doc_hex(hash_hex);
if !path.exists() {
return Ok(None);
}
let bytes = std::fs::read(&path).map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
let map: crate::extract::doc::FileMapDoc = rmp_serde::from_slice(&bytes)?;
check_schema(map.schema_ver)?;
Ok(Some(map))
}
pub fn upsert(&mut self, rel: impl Into<RelPath>, entry: FileEntry) {
self.index.files.insert(rel.into(), entry);
}
pub fn remove(&mut self, rel: impl AsRef<[u8]>) {
self.index.files.remove(bstr::BStr::new(rel.as_ref()));
}
pub fn lookup(&self, rel: impl AsRef<[u8]>) -> Option<&FileEntry> {
self.index.files.get(bstr::BStr::new(rel.as_ref()))
}
pub fn flush(&self) -> Result<(), StoreError> {
let final_path = self.view_dir.join(INDEX_FILE);
let tmp_path = self.view_dir.join(format!("{INDEX_FILE}.tmp"));
let bytes = rmp_serde::to_vec_named(&self.index)?;
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp_path)
.map_err(|source| StoreError::Io {
path: tmp_path.clone(),
source,
})?;
f.write_all(&bytes).map_err(|source| StoreError::Io {
path: tmp_path.clone(),
source,
})?;
f.sync_all().map_err(|source| StoreError::Io {
path: tmp_path.clone(),
source,
})?;
}
std::fs::rename(&tmp_path, &final_path).map_err(|source| StoreError::Io {
path: final_path,
source,
})?;
Ok(())
}
}
fn ensure_dir(p: &Path) -> Result<(), StoreError> {
std::fs::create_dir_all(p).map_err(|source| StoreError::Io {
path: p.to_path_buf(),
source,
})
}
fn ensure_gitignore(basemind_dir: &Path) -> Result<(), StoreError> {
let gitignore = basemind_dir.join(".gitignore");
if gitignore.exists() {
return Ok(());
}
std::fs::write(
&gitignore,
"# basemind's machine-local index — not version-controlled.\n*\n",
)
.map_err(|source| StoreError::Io {
path: gitignore,
source,
})
}
fn wipe_view(view_dir: &Path) -> Result<(), StoreError> {
let index_path = view_dir.join(INDEX_FILE);
if index_path.exists() {
std::fs::remove_file(&index_path).map_err(|source| StoreError::Io {
path: index_path,
source,
})?;
}
Ok(())
}
pub(crate) fn wipe_blobs(basemind_dir: &Path) -> Result<(), StoreError> {
let blobs_dir = basemind_dir.join(BLOBS_DIR);
if blobs_dir.exists() {
std::fs::remove_dir_all(&blobs_dir).map_err(|source| StoreError::Io {
path: blobs_dir.clone(),
source,
})?;
std::fs::create_dir_all(&blobs_dir).map_err(|source| StoreError::Io {
path: blobs_dir,
source,
})?;
}
Ok(())
}
fn migrate_legacy_index_into_views(basemind_dir: &Path) -> Result<(), StoreError> {
let legacy = basemind_dir.join(INDEX_FILE);
if !legacy.exists() {
return Ok(());
}
let working_dir = basemind_dir.join(VIEWS_DIR).join(VIEW_WORKING);
let working_index = working_dir.join(INDEX_FILE);
if working_index.exists() {
let _ = std::fs::remove_file(&legacy);
return Ok(());
}
ensure_dir(&working_dir)?;
std::fs::rename(&legacy, &working_index).map_err(|source| StoreError::Io {
path: working_index,
source,
})?;
tracing::info!(
"migrated .basemind/index.msgpack → .basemind/views/{VIEW_WORKING}/index.msgpack"
);
Ok(())
}
pub(crate) fn read_index(view_dir: &Path) -> Result<Option<Index>, StoreError> {
let path = view_dir.join(INDEX_FILE);
if !path.exists() {
return Ok(None);
}
let bytes = std::fs::read(&path).map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
let index: Index = rmp_serde::from_slice(&bytes)?;
check_schema(index.schema_ver)?;
Ok(Some(index))
}
pub(crate) fn acquire_lock(basemind_dir: &Path) -> Result<File, StoreError> {
acquire_lock_as(basemind_dir, LockHolder::Maintenance)
}
pub(crate) fn acquire_lock_as(basemind_dir: &Path, holder: LockHolder) -> Result<File, StoreError> {
let path = basemind_dir.join(LOCK_FILE);
let file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(&path)
.map_err(|source| StoreError::Io {
path: path.clone(),
source,
})?;
const LOCK_ATTEMPTS: u32 = 25;
const LOCK_BACKOFF: std::time::Duration = std::time::Duration::from_millis(20);
for attempt in 0..LOCK_ATTEMPTS {
match file.try_lock_exclusive() {
Ok(()) => {
write_lock_meta(basemind_dir, holder);
return Ok(file);
}
Err(_) if attempt + 1 < LOCK_ATTEMPTS => std::thread::sleep(LOCK_BACKOFF),
Err(_) => {
return Err(StoreError::Locked {
holder: read_lock_meta(basemind_dir),
path,
});
}
}
}
unreachable!("loop returns on the final attempt")
}
fn write_lock_meta(basemind_dir: &Path, holder: LockHolder) {
let acquired_unix = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0);
let meta = LockMeta {
command: holder.command().to_string(),
pid: std::process::id(),
acquired_unix,
};
let Ok(bytes) = serde_json::to_vec(&meta) else {
return;
};
let final_path = basemind_dir.join(LOCK_META_FILE);
let tmp_path = basemind_dir.join(format!("{LOCK_META_FILE}.{}.tmp", std::process::id()));
if std::fs::write(&tmp_path, &bytes).is_ok() {
let _ = std::fs::rename(&tmp_path, &final_path);
}
}
fn read_lock_meta(basemind_dir: &Path) -> Option<LockMeta> {
let bytes = std::fs::read(basemind_dir.join(LOCK_META_FILE)).ok()?;
serde_json::from_slice(&bytes).ok()
}
#[derive(Deserialize)]
struct BlobSchemaPeek {
schema_ver: u16,
}
fn peek_blob_schema(path: &Path) -> Option<u16> {
let bytes = std::fs::read(path).ok()?;
rmp_serde::from_slice::<BlobSchemaPeek>(&bytes)
.ok()
.map(|peek| peek.schema_ver)
}
fn write_blob<T: Serialize>(path: PathBuf, value: &T) -> Result<(), StoreError> {
if path.exists() && peek_blob_schema(&path) == Some(SCHEMA_VER) {
return Ok(());
}
let bytes = rmp_serde::to_vec_named(value)?;
let suffix = format!(
"{}.{:?}.tmp",
std::process::id(),
std::thread::current().id()
);
let tmp = path.with_extension(format!("msgpack.{suffix}"));
{
let mut f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&tmp)
.map_err(|source| StoreError::Io {
path: tmp.clone(),
source,
})?;
f.write_all(&bytes).map_err(|source| StoreError::Io {
path: tmp.clone(),
source,
})?;
}
if let Err(source) = std::fs::rename(&tmp, &path) {
let _ = std::fs::remove_file(&tmp);
return Err(StoreError::Io {
path: path.clone(),
source,
});
}
Ok(())
}
fn check_schema(found: u16) -> Result<(), StoreError> {
if found == SCHEMA_VER {
Ok(())
} else {
Err(StoreError::SchemaMismatch {
found,
expected: SCHEMA_VER,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn locked_display_names_the_serve_holder() {
let err = StoreError::Locked {
path: PathBuf::from("/repo/.basemind/.lock"),
holder: None,
};
let msg = err.to_string();
assert!(
msg.contains("serve"),
"Locked message should name the `serve` holder, got: {msg}"
);
assert!(
msg.contains("watch"),
"Locked message should still mention `watch`, got: {msg}"
);
}
#[test]
fn locked_message_names_actual_holder_from_sidecar() {
let err = StoreError::Locked {
path: PathBuf::from("/repo/.basemind/.lock"),
holder: Some(LockMeta {
command: "basemind scan".to_string(),
pid: 4321,
acquired_unix: 1_700_000_000,
}),
};
let msg = err.to_string();
assert!(
msg.contains("basemind scan"),
"message should name the actual holder command, got: {msg}"
);
assert!(
msg.contains("4321"),
"message should name the holder pid, got: {msg}"
);
}
#[test]
fn second_acquisition_names_first_holders_command() {
let tmp = tempfile::tempdir().expect("tempdir");
let basemind_dir = tmp.path().join(".basemind");
std::fs::create_dir_all(&basemind_dir).expect("mkdir");
let _held = acquire_lock_as(&basemind_dir, LockHolder::Scan).expect("first lock");
let err = acquire_lock_as(&basemind_dir, LockHolder::Serve)
.expect_err("second acquisition must fail while the first holds the lock");
assert!(err.is_lock_contention(), "must be a contention error");
let msg = err.to_string();
assert!(
msg.contains("basemind scan"),
"second error should name the FIRST holder (scan), got: {msg}"
);
}
#[test]
fn open_read_only_errors_on_never_scanned_named_view() {
let tmp = tempfile::tempdir().expect("tempdir");
let err = match Store::open_read_only(tmp.path(), "rev-deadbee") {
Ok(_) => panic!("named unscanned view must error, not silently open empty"),
Err(e) => e,
};
assert!(
matches!(&err, StoreError::ViewNotScanned { view } if view == "rev-deadbee"),
"expected ViewNotScanned, got: {err:?}"
);
assert!(
err.to_string().contains("rev-deadbee"),
"error names the view, got: {err}"
);
}
#[test]
fn open_read_only_allows_unscanned_working_view() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = Store::open_read_only(tmp.path(), VIEW_WORKING)
.expect("working view opens even when never scanned");
assert!(store.index.files.is_empty(), "empty working index");
}
#[test]
fn open_writer_creates_named_view_for_first_scan() {
let tmp = tempfile::tempdir().expect("tempdir");
let store = Store::open(tmp.path(), "rev-cafe000")
.expect("writer creates a named view on first scan");
assert!(store.view_dir.exists(), "named view dir created by writer");
}
#[test]
fn fs2_advisory_lock_is_lock_contention() {
let err = StoreError::Locked {
path: PathBuf::from("/repo/.basemind/.lock"),
holder: None,
};
assert!(err.is_lock_contention());
}
#[test]
fn fjall_internal_lock_is_lock_contention() {
let err = StoreError::Index(IndexError::Fjall(fjall::Error::Locked));
assert!(err.is_lock_contention());
}
#[test]
fn schema_mismatch_is_not_lock_contention() {
let err = StoreError::SchemaMismatch {
found: 1,
expected: 2,
};
assert!(!err.is_lock_contention());
}
}