use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};
use anyhow::{Context, Result, anyhow, bail};
use lance_io::object_store::uri_to_url;
use serde::{Deserialize, Deserializer, Serialize, de};
use serde_json::Value;
use url::Url;
fn parse_byte_size(raw: &str) -> Result<usize, String> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Err("byte-size value is empty".to_owned());
}
let split = trimmed
.find(|c: char| c.is_ascii_alphabetic())
.unwrap_or(trimmed.len());
let (number, unit) = trimmed.split_at(split);
let number: f64 = number
.trim()
.parse()
.map_err(|_| format!("byte-size value {raw:?} is not a number"))?;
if !number.is_finite() || number < 0.0 {
return Err(format!("byte-size value {raw:?} must be non-negative"));
}
let multiplier: f64 = match unit.trim().to_ascii_lowercase().as_str() {
"" | "b" => 1.0,
"k" | "kb" => 1_000.0,
"kib" => 1_024.0,
"m" | "mb" => 1_000_000.0,
"mib" => 1_048_576.0,
"g" | "gb" => 1_000_000_000.0,
"gib" => 1_073_741_824.0,
"tib" => 1_099_511_627_776.0,
other => {
return Err(format!(
"byte-size unit {other:?} not recognized (try MiB / GiB)"
));
}
};
let bytes = number * multiplier;
if !bytes.is_finite() || bytes > usize::MAX as f64 {
return Err(format!("byte-size value {raw:?} overflows usize"));
}
Ok(bytes as usize)
}
fn deserialize_byte_size_opt<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Repr {
Bytes(u64),
Text(String),
}
let repr: Option<Repr> = Option::deserialize(deserializer)?;
match repr {
None => Ok(None),
Some(Repr::Bytes(value)) => usize::try_from(value).map(Some).map_err(de::Error::custom),
Some(Repr::Text(value)) => parse_byte_size(&value).map(Some).map_err(de::Error::custom),
}
}
pub fn parse_data_dir(input: &str) -> Result<Url> {
uri_to_url(input).with_context(|| format!("invalid --data-dir {input:?}"))
}
pub fn is_local(url: &Url) -> bool {
matches!(url.scheme(), "file" | "file+uring")
}
pub fn local_path(url: &Url) -> Option<PathBuf> {
if is_local(url) {
url.to_file_path().ok()
} else {
None
}
}
pub fn child_uri(base: &Url, suffix: &str) -> String {
if let Some(path) = local_path(base) {
return path.join(suffix).display().to_string();
}
format!("{}/{suffix}", base.as_str().trim_end_matches('/'))
}
pub fn display(url: &Url) -> String {
if let Some(path) = local_path(url) {
path.display().to_string()
} else {
url.to_string()
}
}
pub fn url_for_path(path: impl AsRef<Path>) -> Result<Url> {
let path = path.as_ref();
let absolute = if path.is_absolute() {
path.to_path_buf()
} else {
std::path::absolute(path)
.with_context(|| format!("failed to absolutize {}", path.display()))?
};
Url::from_file_path(&absolute).map_err(|()| {
anyhow!(
"failed to convert path {} into a file:// URL",
absolute.display()
)
})
}
pub const DEFAULT_CONFIG_TOML: &str = "\
# pond configuration.
#
# pond ships built-in defaults, so every setting here is optional - delete this
# file and pond still works. Uncomment and edit to override.
# Where pond looks for source data to import. One entry per adapter type
# (`claude-code`, `codex-cli`, ...). `pond sync` with no arguments syncs every
# entry; `pond sync <adapter>` syncs just one. With an empty `[sources]`,
# `pond sync` runs an interactive discovery against the known default paths
# and writes the picks back here.
#
# Future wrap: pond is single-namespace in v1 (spec.md#wire-namespace-resolution); `[sources]` is
# flat here. When multi-namespace pond lands, source registration becomes
# per-tenant under `[namespaces.<ns>.sources.<adapter>]`. Pre-v1 the schema
# is breakable; the rename is operationally free until a real second tenant
# exists.
#
# [sources.claude-code]
# enabled = true
# path = \"~/.claude/projects\"
#
# [sources.codex-cli]
# enabled = true
# path = \"~/.codex/sessions\"
#
# Set `enabled = false` to keep the section but skip it on `pond sync`;
# re-enable via `pond sync <adapter>`.
# Embeddings. Search runs hybrid (vector + FTS) whenever the store has any
# vectors, and FTS-only otherwise - the model loads lazily on the first hybrid
# query, so there's no cost on FTS-only corpora. `model` selects the
# HuggingFace XLM-RoBERTa model; `dim` declares its output width and is baked
# into the messages.vector schema on table creation - it must equal the
# model's hidden_size and be a multiple of 8 (IVF_PQ subspace stride).
#
# Common pairings:
# model = \"intfloat/multilingual-e5-small\" dim = 384 (default)
# model = \"intfloat/multilingual-e5-base\" dim = 768
# model = \"intfloat/multilingual-e5-large\" dim = 1024
#
# A different-dim model needs a fresh data dir; pond enforces this at the
# schema boundary.
#
# [embeddings]
# model = \"intfloat/multilingual-e5-small\"
# dim = 384
# Search tuning. Leave unset for Lance defaults; set when tuning IVF_PQ recall
# against a corpus.
#
# `index_lag_threshold` is the minimum unindexed-fragment count before a
# per-intent append/rebuild runs in `pond index optimize`; the brute-force
# fallback keeps queries correct while fragments accumulate. Defaults to 4.
#
# [search]
# nprobes = 16
# refine_factor = 2
# index_lag_threshold = 4
# Long-running process caps. Both accept either a plain byte count or a
# humansize-style suffix (\"128 MiB\", \"1 GiB\"). Both are optional - leave
# unset to let pond pick the backend-aware default:
# local FS : index_cache = 256 MiB, metadata_cache = 128 MiB
# remote : index_cache = 2 GiB, metadata_cache = 512 MiB
# Lance's library defaults (6 GiB / 1 GiB) are too generous for a per-session
# `pond mcp` process; tightening them is what keeps RSS under the 500 MiB target
# without measurable latency regressions on typical agent-history corpora.
#
# [runtime]
# index_cache_bytes = \"256 MiB\"
# metadata_cache_bytes = \"128 MiB\"
# Object-store credentials and tuning, passed verbatim to Lance's
# `DatasetBuilder::with_storage_options`. Required only when `--data-dir` is
# an `s3://` / `gs://` / `az://` URI that needs auth or a non-default region.
# Keys follow the `object_store` crate's standard names. Environment
# variables of the same name are read by `object_store` automatically;
# values in this block override them. pond does not parse these.
#
# Future wrap: pond is single-namespace in v1 (spec.md#wire-namespace-resolution); `[storage]` is
# flat here on the assumption of one bucket per pond. When multi-namespace
# pond lands and tenants need separate buckets/regions, this becomes
# `[namespaces.<ns>.storage]`. Pre-v1 the schema is breakable; the rename is
# operationally free until a real second tenant exists.
#
# [storage]
# AWS_ACCESS_KEY_ID = \"...\"
# AWS_SECRET_ACCESS_KEY = \"...\"
# AWS_REGION = \"us-east-1\"
# AWS_ENDPOINT = \"https://minio.example.com\" # for self-hosted MinIO
# allow_http = \"true\" # only for non-TLS endpoints
";
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
#[serde(default)]
pub embeddings: EmbeddingsConfig,
#[serde(default)]
pub search: SearchConfig,
#[serde(default)]
pub runtime: RuntimeConfig,
#[serde(default)]
pub sources: BTreeMap<String, Value>,
#[serde(default)]
pub storage: BTreeMap<String, String>,
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct RuntimeConfig {
#[serde(default, deserialize_with = "deserialize_byte_size_opt")]
pub index_cache_bytes: Option<usize>,
#[serde(default, deserialize_with = "deserialize_byte_size_opt")]
pub metadata_cache_bytes: Option<usize>,
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SearchConfig {
#[serde(default)]
pub nprobes: Option<usize>,
#[serde(default)]
pub refine_factor: Option<u32>,
#[serde(default)]
pub index_lag_threshold: Option<usize>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields, default)]
pub struct EmbeddingsConfig {
pub model: String,
pub dim: usize,
}
impl Default for EmbeddingsConfig {
fn default() -> Self {
Self {
model: crate::embed::DEFAULT_MODEL_ID.to_owned(),
dim: crate::sessions::DEFAULT_EMBEDDING_DIM,
}
}
}
pub fn resolve_data_dir(
explicit: Option<Url>,
xdg_data_home: Option<PathBuf>,
home: Option<PathBuf>,
) -> Result<Url> {
if let Some(location) = explicit {
return Ok(location);
}
if let Some(xdg) = xdg_data_home.filter(|path| path.is_absolute()) {
return url_for_path(xdg.join("pond"));
}
if let Some(home) = home {
return url_for_path(home.join(".local").join("share").join("pond"));
}
url_for_path(PathBuf::from(".pond"))
}
pub fn default_config_path(xdg_config_home: Option<PathBuf>, home: Option<PathBuf>) -> PathBuf {
if let Some(xdg) = xdg_config_home.filter(|path| path.is_absolute()) {
return xdg.join("pond").join("config.toml");
}
if let Some(home) = home {
return home.join(".config").join("pond").join("config.toml");
}
PathBuf::from(".pond.toml")
}
impl Config {
pub fn load(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let config = if path.exists() {
let text = std::fs::read_to_string(path)
.with_context(|| format!("failed to read config {}", path.display()))?;
toml::from_str::<Self>(&text)
.with_context(|| format!("failed to parse config {}", path.display()))?
} else {
Self::default()
};
config.embeddings.validate()?;
config.embeddings.install_runtime();
if let Some(threshold) = config.search.index_lag_threshold {
crate::substrate::init_index_lag_threshold(threshold);
}
Ok(config)
}
pub fn resolve_sources(&self, adapter: Option<&str>) -> Result<Vec<(String, Value)>> {
match adapter {
None => Ok(self
.sources
.iter()
.filter_map(|(name, blob)| take_enabled(name, blob))
.collect()),
Some(name) => {
let blob = self
.sources
.get(name)
.ok_or_else(|| anyhow!("no [sources.{name}] entry in config"))?;
take_enabled(name, blob).map(|entry| vec![entry]).ok_or_else(|| {
anyhow!(
"source [{name}] is disabled (enabled = false); run `pond sync {name}` to re-enable"
)
})
}
}
}
pub fn disabled_source_names(&self) -> Vec<&str> {
self.sources
.iter()
.filter_map(|(name, blob)| {
let enabled = blob
.get("enabled")
.and_then(Value::as_bool)
.unwrap_or(false);
if enabled { None } else { Some(name.as_str()) }
})
.collect()
}
}
fn take_enabled(name: &str, blob: &Value) -> Option<(String, Value)> {
let enabled = blob
.get("enabled")
.and_then(Value::as_bool)
.unwrap_or(false);
if !enabled {
return None;
}
let mut clean = blob.clone();
if let Some(obj) = clean.as_object_mut() {
obj.remove("enabled");
}
Some((name.to_owned(), clean))
}
pub fn expand_home_under(path: &Path, home: &Path) -> PathBuf {
let Some(text) = path.to_str() else {
return path.to_path_buf();
};
if text == "~" {
return home.to_path_buf();
}
if let Some(rest) = text.strip_prefix("~/") {
return home.join(rest);
}
path.to_path_buf()
}
impl EmbeddingsConfig {
pub fn validate(&self) -> Result<()> {
if self.model.trim().is_empty() {
bail!("embeddings.model must be a non-empty HuggingFace model id");
}
if self.dim == 0 || !self.dim.is_multiple_of(8) {
bail!(
"embeddings.dim = {} must be a positive multiple of 8 (IVF_PQ subspace stride)",
self.dim,
);
}
Ok(())
}
pub fn install_runtime(&self) {
crate::embed::init_model_id(self.model.clone());
crate::sessions::init_embedding_dim(self.dim);
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
use serde_json::Value;
use tempfile::TempDir;
#[test]
fn validate_catches_empty_model_and_bad_dim() {
assert!(EmbeddingsConfig::default().validate().is_ok());
let bad_model = EmbeddingsConfig {
model: " ".to_owned(),
dim: 768,
};
assert!(bad_model.validate().is_err());
let bad_dim = EmbeddingsConfig {
model: "intfloat/multilingual-e5-base".to_owned(),
dim: 100,
};
assert!(bad_dim.validate().is_err());
let zero_dim = EmbeddingsConfig {
model: "intfloat/multilingual-e5-base".to_owned(),
dim: 0,
};
assert!(zero_dim.validate().is_err());
}
#[test]
fn config_load_missing_file_falls_back_to_builtin() {
let config = Config::load("/nonexistent/pond-config-xyz.toml").unwrap();
assert_eq!(config.embeddings, EmbeddingsConfig::default());
}
#[test]
fn default_config_toml_loads_to_the_builtin_defaults() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("config.toml");
std::fs::write(&path, DEFAULT_CONFIG_TOML).unwrap();
let config = Config::load(&path).unwrap();
assert_eq!(config.embeddings, EmbeddingsConfig::default());
assert_eq!(config.embeddings.model, crate::embed::DEFAULT_MODEL_ID);
assert_eq!(
config.embeddings.dim,
crate::sessions::DEFAULT_EMBEDDING_DIM
);
}
#[test]
fn resolve_data_dir_follows_explicit_then_xdg_then_home() {
let explicit = parse_data_dir("/explicit").unwrap();
let resolved = resolve_data_dir(
Some(explicit.clone()),
Some(PathBuf::from("/xdg")),
Some(PathBuf::from("/home")),
)
.unwrap();
assert_eq!(resolved, explicit);
let resolved = resolve_data_dir(
None,
Some(PathBuf::from("/xdg")),
Some(PathBuf::from("/home")),
)
.unwrap();
assert!(is_local(&resolved));
assert_eq!(local_path(&resolved).unwrap(), PathBuf::from("/xdg/pond"));
let resolved = resolve_data_dir(
None,
Some(PathBuf::from("relative")),
Some(PathBuf::from("/home")),
)
.unwrap();
assert_eq!(
local_path(&resolved).unwrap(),
PathBuf::from("/home/.local/share/pond"),
);
let resolved = resolve_data_dir(None, None, None).unwrap();
assert!(is_local(&resolved));
assert!(
local_path(&resolved).unwrap().ends_with(".pond"),
"fallback path should end with .pond: {resolved}",
);
}
#[test]
fn expand_home_under_handles_tilde_forms() {
let home = Path::new("/srv/me");
assert_eq!(
expand_home_under(Path::new("~"), home),
PathBuf::from("/srv/me")
);
assert_eq!(
expand_home_under(Path::new("~/.codex/sessions"), home),
PathBuf::from("/srv/me/.codex/sessions"),
);
assert_eq!(
expand_home_under(Path::new("/etc/passwd"), home),
PathBuf::from("/etc/passwd"),
);
assert_eq!(
expand_home_under(Path::new("~user/elsewhere"), home),
PathBuf::from("~user/elsewhere"),
);
}
#[test]
fn resolve_sources_returns_one_or_all_or_errors() {
let temp = TempDir::new().unwrap();
let body = "\
[sources.claude-code]
enabled = true
path = \"/srv/claude\"
[sources.codex-cli]
enabled = true
path = \"/srv/codex\"
[sources.opencode]
enabled = false
";
let path = temp.path().join("config.toml");
std::fs::write(&path, body).expect("write config");
let config = Config::load(&path).unwrap();
let all = config.resolve_sources(None).unwrap();
assert_eq!(all.len(), 2);
let names: Vec<_> = all.iter().map(|(n, _)| n.as_str()).collect();
assert!(names.contains(&"claude-code"));
assert!(names.contains(&"codex-cli"));
for (_, blob) in &all {
assert!(blob.get("enabled").is_none(), "enabled should be stripped");
}
let one = config.resolve_sources(Some("codex-cli")).unwrap();
assert_eq!(one.len(), 1);
assert_eq!(one[0].0, "codex-cli");
assert_eq!(
one[0].1.get("path").and_then(Value::as_str),
Some("/srv/codex"),
);
let disabled = config.resolve_sources(Some("opencode"));
let err = disabled
.expect_err("disabled adapter must error")
.to_string();
assert!(err.contains("enabled = false"), "got: {err}");
assert!(err.contains("pond sync opencode"), "got: {err}");
assert!(config.resolve_sources(Some("nope")).is_err());
assert_eq!(config.disabled_source_names(), vec!["opencode"]);
}
#[test]
fn memory_uri_is_classified_as_remote() {
let url = parse_data_dir("memory:///pond-remote-test").expect("memory uri parses");
assert!(
!is_local(&url),
"memory:// is not a local-filesystem URL: {url}",
);
assert!(
local_path(&url).is_none(),
"local_path must return None for non-file schemes",
);
}
}