use std::{
collections::HashMap,
fs::{self, File},
io::{self, IsTerminal},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use chrono::{DateTime, Utc};
use anyhow::{Context, bail};
use clap::{ArgGroup, CommandFactory, Parser, Subcommand, ValueEnum};
use comfy_table::{Attribute, Cell, CellAlignment, ContentArrangement, Table, presets::NOTHING};
use indicatif::{ProgressBar, ProgressStyle};
use pond::{
PROTOCOL_VERSION, adapter,
config::{self, Config, DEFAULT_CONFIG_TOML},
embed::{BatchProgress, CandleEmbedder, EmbedSummary, EmbedWorker, Embedder, LazyEmbedder},
handlers::{self, IngestSummary, SessionOutcome, SyncEvent, SyncStatus},
sessions::{
EmbeddingProgress, LanceArchiveCounts, LanceArchiveExport, LanceArchiveImport,
MESSAGES_FTS_INDEX, MESSAGES_VECTOR_INDEX, OptimizeOutcome, RowTotals, Store,
},
substrate::{
self, CheckFailure, CredsBinding, IndexStatus, MaintenancePolicy, OptimizeEvent,
OptimizeProgressFn, PhaseOutcome, ResolvedStorage, StorageUrl, TableSizes,
VECTOR_INDEX_ACTIVATION_ROWS, default_cleanup_older_than,
},
transport::{self, AppState},
wire::{
self, ErrorEnvelope, GetEnvelope, GetRequest, ProjectFilter, SearchEnvelope, SearchFilters,
SearchModeWire, SearchRequest, SessionFrom,
},
};
mod init;
mod schedule;
mod syncstate;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct PondArchiveManifest {
archive_version: u32,
pond_version: String,
protocol_version: u16,
created_at: DateTime<Utc>,
rows: LanceArchiveCounts,
source_versions: pond::sessions::LanceArchiveVersions,
embedding_model: String,
embedding_dim: usize,
}
#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum CliSearchMode {
Fts,
Vector,
}
#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum CliSortBy {
Relevance,
Recency,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum OptimizeStage {
Embed,
Index,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum ServeTransport {
Http,
Stdio,
}
impl From<CliSearchMode> for SearchModeWire {
fn from(mode: CliSearchMode) -> Self {
match mode {
CliSearchMode::Fts => SearchModeWire::Fts,
CliSearchMode::Vector => SearchModeWire::Vector,
}
}
}
impl From<CliSortBy> for wire::SortBy {
fn from(sort_by: CliSortBy) -> Self {
match sort_by {
CliSortBy::Relevance => wire::SortBy::Relevance,
CliSortBy::Recency => wire::SortBy::Recency,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum CliSqlFormat {
Text,
Ndjson,
Parquet,
}
#[derive(Debug, Clone, Copy, ValueEnum)]
enum CliSessionFrom {
Start,
End,
}
impl From<CliSessionFrom> for SessionFrom {
fn from(value: CliSessionFrom) -> Self {
match value {
CliSessionFrom::Start => SessionFrom::Start,
CliSessionFrom::End => SessionFrom::End,
}
}
}
use serde_json::{Value, json};
use tokio::io::AsyncWriteExt;
use tracing_indicatif::IndicatifLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, fmt};
use url::Url;
fn default_local_storage() -> anyhow::Result<StorageUrl> {
let url = pond::config::default_storage_path(
std::env::var_os("XDG_DATA_HOME").map(PathBuf::from),
std::env::var_os("HOME").map(PathBuf::from),
)?;
StorageUrl::parse(url.as_str())
}
fn default_cache_dir() -> PathBuf {
pond::config::default_cache_path(
std::env::var_os("XDG_CACHE_HOME").map(PathBuf::from),
std::env::var_os("HOME").map(PathBuf::from),
)
}
fn parse_storage_path(input: &str) -> anyhow::Result<StorageUrl> {
if matches!(
input.trim().to_ascii_lowercase().as_str(),
"local" | "default"
) {
return default_local_storage();
}
StorageUrl::parse(input)
}
fn find_on_path(name: &str) -> Option<PathBuf> {
let paths = std::env::var_os("PATH")?;
std::env::split_paths(&paths).find_map(|dir| {
let candidate = dir.join(name);
is_executable(&candidate).then_some(candidate)
})
}
#[cfg(unix)]
fn is_executable(path: &Path) -> bool {
use std::os::unix::fs::PermissionsExt;
std::fs::metadata(path)
.map(|meta| meta.is_file() && meta.permissions().mode() & 0o111 != 0)
.unwrap_or(false)
}
#[cfg(not(unix))]
fn is_executable(path: &Path) -> bool {
path.is_file()
}
const STYLES: clap::builder::styling::Styles = clap::builder::styling::Styles::styled()
.header(anstyle::Style::new().bold())
.usage(anstyle::Style::new().bold())
.literal(anstyle::AnsiColor::Cyan.on_default())
.placeholder(anstyle::Style::new().dimmed());
static VERSION: std::sync::LazyLock<String> = std::sync::LazyLock::new(|| {
if cfg!(debug_assertions) {
format!("{}-dev", env!("CARGO_PKG_VERSION"))
} else {
env!("CARGO_PKG_VERSION").to_owned()
}
});
static LONG_VERSION: std::sync::LazyLock<String> = std::sync::LazyLock::new(|| {
let target = format!("{}-{}", std::env::consts::ARCH, std::env::consts::OS);
match option_env!("POND_BUILD_COMMIT") {
Some(commit) if !commit.is_empty() => format!("{} ({commit} {target})", *VERSION),
_ => format!("{} ({target})", *VERSION),
}
});
const HELP_TEMPLATE: &str = "\
{about-with-newline}pond {version}
{usage-heading} {usage}
Commands:
Setup
init Set up pond (idempotent: safe to re-run)
adapters Choose which adapters pond sync ingests
storage Probe and switch storage destinations
creds Manage URL-scoped credential sets
schedule Manage the automatic sync schedule
config Inspect configuration
Data flow
sync Make pond current: import, embed, index
optimize Embed the backlog, then fold the indexes
copy Copy data between stores, archives, JSONL
Query
search Search stored messages
get Fetch a session or message
sql Run one read-only SQL query
status Show pond health, data, and adapters
Serve
serve Run the HTTP API server
mcp Serve the MCP tools over stdio
Shell
completions Generate shell completions
skill Print the agent-onboarding SKILL.md
help Print this message or a subcommand's help
Options:
{options}{after-help}";
#[derive(Debug, Parser)]
#[command(
name = "pond",
version = VERSION.as_str(),
long_version = LONG_VERSION.as_str(),
styles = STYLES,
max_term_width = 100,
help_template = HELP_TEMPLATE,
after_long_help = "\
Getting started:
pond init set up storage, adapters, MCP, and scheduling
pond sync import new sessions, embed, update indexes
pond search \"that auth refactor\" find past work
claude mcp add -s user pond -- pond mcp register pond as an MCP server in Claude Code
Every command documents itself: `pond <command> --help` carries examples."
)]
struct Cli {
#[command(subcommand)]
command: Command,
#[command(flatten)]
store: StoreArgs,
#[command(flatten)]
#[command(next_help_heading = "Global options")]
verbose: clap_verbosity_flag::Verbosity<clap_verbosity_flag::WarnLevel>,
}
#[derive(Debug, clap::Args)]
#[command(next_help_heading = "Global options")]
struct StoreArgs {
#[arg(
long,
global = true,
env = "POND_STORAGE_PATH",
hide_env_values = true,
value_parser = parse_storage_path,
value_name = "URL"
)]
storage_path: Option<StorageUrl>,
#[arg(
long = "config-file",
global = true,
env = "POND_CONFIG_FILE",
hide_env_values = true,
value_name = "PATH"
)]
config: Option<PathBuf>,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Subcommand)]
enum Command {
#[command(after_long_help = "Examples:
pond init interactive setup (or repair)
pond init --yes accept defaults, no prompts
pond init --storage-path s3://bucket/pond preset storage, prompt for the rest
pond init --adapters claude-code,codex-cli --yes
pond init --every 1h --yes also register an hourly sync")]
#[command(display_order = 1)]
Init(init::InitArgs),
#[command(after_long_help = "Examples:
pond sync sync every enabled adapter
pond sync claude-code sync one enabled adapter
pond sync --dry-run preview what the next sync would read
pond sync codex-cli --path ~/backup one-off path override, config untouched
pond sync --verify full re-read; heal anything the skip missed")]
#[command(display_order = 7)]
Sync {
adapter: Option<String>,
#[arg(long, value_name = "DIR")]
path: Option<PathBuf>,
#[arg(long)]
verify: bool,
#[arg(long)]
dry_run: bool,
#[arg(long)]
no_wait: bool,
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond adapters list configured + detected adapters
pond adapters discover probe this machine, pick what to enable
pond adapters enable claude-code enable one (discovers its path if needed)
pond adapters disable opencode stop syncing one, keep it on record")]
#[command(display_order = 2)]
Adapters {
#[command(subcommand)]
command: AdaptersCmd,
},
#[command(after_long_help = "Examples:
pond status the one-screen overview
pond status --hosts which machines feed this store, and when
pond status --include-subagents count each subagent as its own adapter")]
#[command(display_order = 13)]
Status {
#[arg(long)]
include_subagents: bool,
#[arg(long)]
hosts: bool,
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond search \"lance compaction tuning\"
pond search \"auth retry\" --project pond --limit 5
pond search \"merge_insert\" --mode fts --sort-by recency
pond search \"migration plan\" --from-date 2026-05-01 --format json")]
#[command(display_order = 10)]
Search {
query: String,
#[arg(long, default_value = "local")]
namespace: String,
#[arg(long, default_value_t = 10)]
limit: usize,
#[arg(long, value_enum)]
mode: Option<CliSearchMode>,
#[arg(long, value_enum)]
sort_by: Option<CliSortBy>,
#[arg(long, value_parser = parse_project_filter)]
project: Option<ProjectFilter>,
#[arg(long, value_name = "ID")]
session_id: Option<String>,
#[arg(long)]
from_date: Option<String>,
#[arg(long)]
to_date: Option<String>,
#[arg(long, default_value_t = 0.0)]
min_score: f64,
#[arg(long)]
explain: bool,
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond get --session-id 58a96901-4a4f-40be-a3c1-62419ec8c580
pond get --session-id <ID> --session-from end most recent messages
pond get --session-id <ID> --session-after-message-id <ID> page down
pond get --message-id <ID> --message-context-before 3 --message-context-after 3")]
#[command(group(ArgGroup::new("get_selector")
.required(true)
.args(["session_id", "message_id"])))]
#[command(display_order = 11)]
Get {
#[arg(long, value_name = "ID")]
session_id: Option<String>,
#[arg(long, value_name = "ID")]
message_id: Option<String>,
#[arg(long, default_value = "local")]
namespace: String,
#[arg(long, default_value_t = 20, conflicts_with = "message_id")]
session_limit: usize,
#[arg(
long,
value_enum,
default_value_t = CliSessionFrom::Start,
conflicts_with = "message_id"
)]
session_from: CliSessionFrom,
#[arg(long, value_name = "ID", conflicts_with = "message_id")]
session_after_message_id: Option<String>,
#[arg(long, value_name = "ID", conflicts_with = "message_id")]
session_before_message_id: Option<String>,
#[arg(long, default_value_t = 3, conflicts_with = "session_id")]
message_context_before: usize,
#[arg(long, default_value_t = 3, conflicts_with = "session_id")]
message_context_after: usize,
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond sql \"SELECT count(*) FROM sessions\"
pond sql \"SELECT session_id, ts, role FROM messages WHERE contains_tokens(search_text, 'occ retry') LIMIT 20\"
pond sql \"SELECT * FROM messages\" --format parquet -o messages.parquet")]
#[command(display_order = 12)]
Sql {
sql: String,
#[arg(long, value_enum, default_value_t = CliSqlFormat::Text)]
format: CliSqlFormat,
#[arg(long, default_value_t = 100)]
limit: usize,
#[arg(long, short = 'o')]
output_file: Option<PathBuf>,
},
#[command(after_long_help = "Examples:
pond serve HTTP on 127.0.0.1:9797
pond serve --port 8080
pond serve --transport stdio same as `pond mcp`")]
#[command(display_order = 14)]
Serve {
#[arg(long, value_enum, default_value_t = ServeTransport::Http)]
transport: ServeTransport,
#[arg(
long,
env = "POND_HOST",
hide_env_values = true,
default_value = "127.0.0.1"
)]
host: String,
#[arg(
long,
env = "POND_PORT",
hide_env_values = true,
default_value_t = 9797
)]
port: u16,
},
#[command(after_long_help = "Examples:
claude mcp add -s user pond -- pond mcp register in Claude Code
codex mcp add pond -- pond mcp register in Codex CLI")]
#[command(display_order = 15)]
Mcp {},
#[command(after_long_help = "Examples:
pond schedule start every 5 minutes (the default)
pond schedule start --every 15m
pond schedule status
pond schedule logs
pond schedule stop")]
#[command(display_order = 5)]
Schedule {
#[command(subcommand)]
command: schedule::ScheduleCmd,
},
#[command(after_long_help = "Examples:
pond storage check s3+https://host/bucket/pond probe a destination end-to-end
pond storage use s3+https://host/bucket/pond probe, then switch the destination")]
#[command(display_order = 3)]
Storage {
#[command(subcommand)]
command: StorageCmd,
},
#[command(after_long_help = "Examples:
pond creds add interactive: name (default), key, hidden secret
pond creds add work --scope s3+https://host/work
pond creds list
pond creds delete work")]
#[command(display_order = 4)]
Creds {
#[command(subcommand)]
command: CredsCmd,
},
#[command(after_long_help = "Examples:
pond copy --from local --to s3+https://host/bucket/pond migrate the local store to a bucket
pond copy --from @ --to backup/2026-06-16.pond snapshot the configured store to an archive
pond copy --from backup/2026-06-16.pond --to @ restore an archive into the configured store
pond copy --from @ --to s3://bucket/pond --verify-only re-check membership, copy nothing
pond copy --from @ --to - | jq . stream the configured store as JSONL")]
#[command(display_order = 9)]
Copy {
#[arg(long, value_name = "URL|FILE")]
from: String,
#[arg(long, value_name = "URL|FILE")]
to: String,
#[arg(long)]
verify_only: bool,
},
#[command(flatten_help = true)]
#[command(after_long_help = "Examples:
pond config show every setting, its value, and where it came from
pond config schema > ~/.config/pond/config.toml start from the annotated template")]
#[command(display_order = 6)]
Config {
#[command(subcommand)]
command: ConfigCmd,
},
#[command(after_long_help = "Install:
bash: pond completions bash > ~/.local/share/bash-completion/completions/pond
zsh: pond completions zsh > \"${fpath[1]}/_pond\"
fish: pond completions fish > ~/.config/fish/completions/pond.fish
Homebrew and nix packages ship these pre-installed.")]
#[command(display_order = 16)]
Completions {
#[arg(value_enum)]
shell: clap_complete::Shell,
},
#[command(display_order = 17)]
Skill,
#[command(after_long_help = "Examples:
pond optimize embed any backlog, then fold indexes
pond optimize --only index fold indexes only
pond optimize --only embed embed only
pond optimize --force-embed re-embed stale rows after a model change")]
#[command(display_order = 8)]
Optimize {
#[arg(long, value_enum)]
only: Option<OptimizeStage>,
#[arg(long, value_enum)]
skip: Vec<OptimizeStage>,
#[arg(long)]
force_embed: bool,
#[arg(long, conflicts_with_all = ["only", "skip", "force_embed", "drop_index"])]
rebuild: bool,
#[arg(long, value_name = "NAME", conflicts_with_all = ["only", "skip", "force_embed", "rebuild"])]
drop_index: Option<String>,
},
}
#[derive(Debug, Subcommand)]
enum StorageCmd {
#[command(after_long_help = "Examples:
pond storage check probe the configured destination
pond storage check s3+https://host/bucket/prefix probe a candidate before switching")]
Check {
url: Option<String>,
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond storage use s3+https://host/bucket/pond probe, then switch the active destination
pond storage use local roll back to the default local store")]
Use {
url: String,
},
}
#[derive(Debug, Subcommand)]
enum CredsCmd {
Add {
name: Option<String>,
#[arg(long)]
scope: Option<String>,
#[arg(long)]
region: Option<String>,
},
List {
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
Delete {
name: String,
},
}
#[derive(Debug, Subcommand)]
enum AdaptersCmd {
List {
#[arg(long, value_enum, default_value_t = OutputFormat::Text)]
format: OutputFormat,
},
#[command(after_long_help = "Examples:
pond adapters discover probe, then pick what to enable")]
Discover,
#[command(after_long_help = "Examples:
pond adapters enable claude-code
pond adapters enable codex-cli")]
Enable {
name: String,
},
#[command(after_long_help = "Examples:
pond adapters disable opencode stop syncing it, keep its config entry")]
Disable {
name: String,
},
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Subcommand)]
enum ConfigCmd {
Show {},
Path,
Schema,
}
fn parse_retention(raw: &str) -> Result<chrono::Duration, String> {
let trimmed = raw.trim();
let split = trimmed
.find(|c: char| !c.is_ascii_digit())
.unwrap_or(trimmed.len());
let (number, unit) = trimmed.split_at(split);
let amount: i64 = number
.parse()
.map_err(|_| format!("retention {raw:?}: leading number is not an integer"))?;
if amount < 0 {
return Err(format!("retention {raw:?} must be non-negative"));
}
match unit {
"s" => Ok(chrono::Duration::seconds(amount)),
"m" => Ok(chrono::Duration::minutes(amount)),
"h" => Ok(chrono::Duration::hours(amount)),
"d" => Ok(chrono::Duration::days(amount)),
other => Err(format!(
"retention {raw:?}: unit {other:?} not recognized (use s/m/h/d)"
)),
}
}
fn parse_project_filter(input: &str) -> Result<ProjectFilter, String> {
if let Some(pattern) = input.strip_prefix("re:") {
Ok(ProjectFilter::Regex(pattern.to_owned()))
} else if let Some(literal) = input.strip_prefix("lit:") {
Ok(ProjectFilter::Contains(literal.to_owned()))
} else {
Ok(ProjectFilter::Contains(input.to_owned()))
}
}
#[derive(Debug, Clone, Copy, ValueEnum)]
enum OutputFormat {
Text,
Json,
}
fn raise_lance_mem_pool() {
if std::env::var_os("LANCE_MEM_POOL_SIZE").is_some() {
return;
}
#[allow(unsafe_code)]
unsafe {
std::env::set_var("LANCE_MEM_POOL_SIZE", "1073741824");
}
tracing::debug!("LANCE_MEM_POOL_SIZE defaulted to 1 GiB");
}
fn cap_serve_io_buffer() {
if std::env::var_os("LANCE_DEFAULT_IO_BUFFER_SIZE").is_some() {
return;
}
#[allow(unsafe_code)]
unsafe {
std::env::set_var("LANCE_DEFAULT_IO_BUFFER_SIZE", "268435456");
}
tracing::debug!("LANCE_DEFAULT_IO_BUFFER_SIZE defaulted to 256 MiB for serving");
}
const ROWMAP_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
fn spawn_prewarm(store: Arc<Store>) {
let cache_dir = default_cache_dir();
tokio::spawn(async move {
let started = std::time::Instant::now();
tracing::info!("prewarm: warming search indices");
match store.prewarm(&cache_dir).await {
Ok(()) => tracing::info!(
elapsed_ms = started.elapsed().as_millis() as u64,
"prewarm: search indices warm"
),
Err(error) => {
tracing::warn!(%error, "prewarm: failed; first query will pay the cold load");
}
}
loop {
tokio::time::sleep(ROWMAP_REFRESH_INTERVAL).await;
if let Err(error) = store.ensure_rowmap(&cache_dir).await {
tracing::debug!(%error, "rowmap refresh skipped");
}
store.prune_index_cache(&cache_dir).await;
}
});
}
#[cfg(unix)]
fn try_raise_fd_limit(target: u64) -> anyhow::Result<()> {
use rlimit::{Resource, getrlimit, setrlimit};
let (soft, hard) = getrlimit(Resource::NOFILE).context("getrlimit(RLIMIT_NOFILE)")?;
let new_soft = target.min(hard);
if new_soft <= soft {
return Ok(());
}
setrlimit(Resource::NOFILE, new_soft, hard).context("setrlimit(RLIMIT_NOFILE)")?;
tracing::info!(
target: "pond::perf",
soft = new_soft,
hard,
"raised RLIMIT_NOFILE to clear the FTS index-merge EMFILE class"
);
Ok(())
}
#[cfg(not(unix))]
fn try_raise_fd_limit(_target: u64) -> anyhow::Result<()> {
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
human_panic::setup_panic!();
let cli = Cli::parse();
init_tracing(cli.verbose.tracing_level_filter());
if let Err(error) = try_raise_fd_limit(65_536) {
tracing::debug!("RLIMIT_NOFILE bump skipped: {error}");
}
raise_lance_mem_pool();
let verbose = cli.verbose.tracing_level_filter() >= tracing::level_filters::LevelFilter::INFO;
let StoreArgs {
storage_path,
config,
} = cli.store;
match cli.command {
Command::Init(args) => init::run(args, storage_path, config).await?,
Command::Status {
include_subagents,
hosts,
format,
} => {
let outcome: anyhow::Result<()> = async {
let loaded = Config::load(config_path(config))?;
let (resolved, store) = open_store(storage_path, &loaded, false, false).await?;
let store_key = pond::substrate::store_key(resolved.lance_url());
if !store.initialized().await? {
let has_adapters = loaded
.resolve_adapters(None)
.map(|adapters| !adapters.is_empty())
.unwrap_or(false);
match format {
OutputFormat::Json => output(&status_json_empty(&resolved)?)?,
OutputFormat::Text => {
render_empty_status("pond status", &resolved, has_adapters)?;
output(&crate::schedule::status_line())?;
if hosts {
output_err(&pond::output::paint(
"(--hosts has nothing to report until the first sync populates the store)",
pond::output::dim(),
))?;
}
}
}
} else {
let embedding_fut = async {
if verbose {
store.embedding_progress().await.map(Some)
} else {
Ok(None)
}
};
let hosts_fut = async {
if hosts {
store.ingest_host_activity().await.map(Some)
} else {
Ok(None)
}
};
let (
sizes,
(sessions, messages, parts),
names,
index_status,
embedding,
host_activity,
) = tokio::try_join!(
store.table_sizes(),
store.row_counts(),
store.adapter_names(include_subagents),
store.index_status_indexable(),
embedding_fut,
hosts_fut,
)?;
let totals = RowTotals {
sessions: sessions as u64,
messages: messages as u64,
parts: parts as u64,
};
let checks = StatusChecks {
totals: &totals,
adapter_count: names.len(),
index_status: &index_status,
embedding,
};
let local = local_status(
&loaded,
&store,
&store_key,
crate::schedule::status_snapshot(),
)
.await;
match format {
OutputFormat::Json => {
output(&status_json(
&resolved,
&sizes,
&checks,
&local,
host_activity.as_deref(),
)?)?;
}
OutputFormat::Text => {
render_status_header("pond status", &resolved, &sizes, &totals)?;
render_status_checks(&checks)?;
render_local_status(&local)?;
if let Some(host_activity) = &host_activity {
render_host_activity(host_activity)?;
}
let probes = adapter::probe_unconfigured(&loaded.adapters);
if !probes.is_empty() {
let names: Vec<&str> = probes.iter().map(|c| c.name.as_str()).collect();
output_err(&pond::output::paint(
&format!(
"hint detected but not enabled: {} - run `pond adapters discover` to enable",
names.join(", "),
),
pond::output::dim(),
))?;
}
}
}
}
Ok(())
}
.await;
if matches!(format, OutputFormat::Json)
&& let Err(error) = &outcome
{
emit_json_error(error)?;
}
outcome?;
}
Command::Sync {
adapter,
path,
verify,
dry_run,
no_wait,
format,
} => {
let config_file = config_path(config);
let loaded = Config::load(&config_file)?;
run_sync(
&loaded,
&config_file,
storage_path,
SyncInvocation {
adapter,
path,
verify,
dry_run,
no_wait,
format,
},
)
.await?;
}
Command::Optimize {
only,
skip,
force_embed,
rebuild,
drop_index,
} => {
let cmd_started = std::time::Instant::now();
let loaded = Config::load(config_path(config))?;
let (_, store) = open_store(storage_path, &loaded, true, false).await?;
if let Some(name) = drop_index {
store
.drop_index_by_name(&name)
.await
.with_context(|| format!("drop_index({name}) failed"))?;
output(&format!("optimize: dropped index {name}"))?;
} else if rebuild {
let policy = configured_maintenance_policy(&loaded, None)?;
run_embed_stage(&store, force_embed).await?;
let (progress, bar) = optimize_progress_bar();
let result = store.rebuild_indices(None, Some(progress)).await;
bar.finish_and_clear();
result.context("rebuild_indices failed")?;
store
.cleanup_old_versions(policy.cleanup_older_than)
.await
.context("cleanup after rebuild failed")?;
output("optimize: indexes rebuilt, old segments reclaimed")?;
} else {
let stages = OptimizeStages::resolve(only, &skip)?;
let policy = configured_maintenance_policy(&loaded, None)?;
let outcome = if stages.embed && stages.index {
let started = std::time::Instant::now();
let outcome = finalize_indexes(&store, &policy, force_embed).await?;
output(&stage_line(
started.elapsed(),
"indexes",
"embed + fold complete",
))?;
Some(outcome)
} else {
if stages.embed {
let started = std::time::Instant::now();
run_embed_stage(&store, force_embed).await?;
output(&stage_line(started.elapsed(), "embed", "backlog complete"))?;
}
if stages.index {
let started = std::time::Instant::now();
let outcome = run_update_indexes_stage(&store, &policy).await?;
output(&stage_line(
started.elapsed(),
"fold",
"indexes folded + compacted",
))?;
Some(outcome)
} else {
None
}
};
if outcome.is_some_and(|outcome| outcome.any_indices_failed()) {
std::process::exit(1);
}
}
output(&format!(
"{} optimize complete in {}",
pond::output::paint("done -", pond::output::dim()),
elapsed_hms(cmd_started.elapsed()),
))?;
}
Command::Adapters { command } => {
let config_file = config_path(config);
let loaded = Config::load(&config_file)?;
run_adapters(command, &config_file, &loaded)?;
}
Command::Serve {
transport,
host,
port,
} => {
cap_serve_io_buffer();
let config = Config::load(config_path(config))?;
let embedder = Arc::new(LazyEmbedder::candle());
let store = Arc::new(
open_store(storage_path, &config, true, true)
.await?
.1
.with_embedder(embedder.clone()),
);
let state = AppState {
store,
embedder,
search: config.search.clone(),
};
spawn_prewarm(state.store.clone());
match transport {
ServeTransport::Http => {
output(&format!("serve: http listening on http://{host}:{port}"))?;
transport::http::serve(state, host, port).await?;
}
ServeTransport::Stdio => {
eprintln!("serve: stdio MCP ready; stdout is reserved for JSON-RPC");
transport::mcp::serve_stdio(state).await?;
}
}
}
Command::Mcp {} => {
cap_serve_io_buffer();
let config = Config::load(config_path(config))?;
let embedder = Arc::new(LazyEmbedder::candle());
let store = Arc::new(
open_store(storage_path, &config, true, true)
.await?
.1
.with_embedder(embedder.clone()),
);
spawn_prewarm(store.clone());
transport::mcp::serve_stdio(AppState {
store,
embedder,
search: config.search.clone(),
})
.await?;
}
Command::Search {
query,
namespace,
limit,
mode,
sort_by,
project,
session_id,
from_date,
to_date,
min_score,
explain,
format,
} => {
let loaded = Config::load(config_path(config))?;
let (_, store) = open_store(storage_path, &loaded, false, true).await?;
let embedder = LazyEmbedder::candle();
let request = SearchRequest {
protocol_version: PROTOCOL_VERSION,
namespace: Some(namespace),
query,
mode: mode.map(SearchModeWire::from).unwrap_or_default(),
sort_by: sort_by.map(wire::SortBy::from).unwrap_or_default(),
filters: SearchFilters {
project,
session_id,
from_date,
to_date,
min_score,
},
limit,
};
if explain {
let plans = explain_search(&store, &embedder, &request, &loaded.search).await?;
output(&plans)?;
return Ok(());
}
if let Err(error) = store.load_rowmap_if_present(&default_cache_dir()).await {
tracing::debug!(%error, "rowmap load skipped; hydration uses take_rows");
}
let envelope =
handlers::pond_search(&store, &embedder, request.clone(), &loaded.search).await;
if !render_search_envelope(format, &envelope, &request)? {
std::process::exit(1);
}
}
Command::Get {
session_id,
message_id,
namespace,
session_limit,
session_from,
session_after_message_id,
session_before_message_id,
message_context_before,
message_context_after,
format,
} => {
let loaded = Config::load(config_path(config))?;
let (_, store) = open_store(storage_path, &loaded, false, false).await?;
let request = GetRequest {
protocol_version: PROTOCOL_VERSION,
namespace: Some(namespace),
session_id,
message_id,
session_limit,
session_from: SessionFrom::from(session_from),
session_after_message_id,
session_before_message_id,
message_context_before,
message_context_after,
};
let envelope = handlers::pond_get(&store, request.clone()).await;
let mut subagents_footer = String::new();
if let GetEnvelope::Success(response) = &envelope
&& request.message_id.is_none()
&& request.session_after_message_id.is_none()
&& request.session_before_message_id.is_none()
&& let Ok(children) = store.child_sessions(&response.session.id).await
&& !children.is_empty()
{
subagents_footer =
pond::render::render_subagents_footer(&children, pond::render::Surface::Cli);
}
if !render_get_envelope(format, &envelope, &request, &subagents_footer)? {
std::process::exit(1);
}
}
Command::Config { command } => {
run_config_command(command, storage_path, config).await?;
}
Command::Schedule { command } => schedule::run(command)?,
Command::Completions { shell } => {
clap_complete::generate(shell, &mut Cli::command(), "pond", &mut io::stdout());
}
Command::Skill => {
use std::io::Write;
io::stdout().write_all(include_str!("../SKILL.md").as_bytes())?;
}
Command::Storage { command } => run_storage_command(command, storage_path, config).await?,
Command::Creds { command } => {
let config_file = config_path(config);
let loaded = Config::load(&config_file)?;
run_creds(command, &config_file, &loaded)?;
}
Command::Copy {
from,
to,
verify_only,
} => {
run_copy(from, to, verify_only, storage_path, config).await?;
}
Command::Sql {
sql,
format,
limit,
output_file,
} => {
if matches!(format, CliSqlFormat::Parquet) && output_file.is_none() {
bail!(
"--format parquet requires --output-file <path> (binary, can't go to stdout)"
);
}
let loaded = Config::load(config_path(config))?;
let (_, store) = open_store(storage_path, &loaded, false, false).await?;
let mode = match format {
CliSqlFormat::Text => pond::sql::Mode::Inline,
CliSqlFormat::Ndjson => pond::sql::Mode::Export(pond::sql::Format::Ndjson),
CliSqlFormat::Parquet => pond::sql::Mode::Export(pond::sql::Format::Parquet),
};
let inline_rows = limit.min(pond::sql::MAX_INLINE_ROWS);
use pond::substrate::Table;
let (sessions, messages, parts) = tokio::try_join!(
async {
anyhow::Ok(match pond::sql::mentions_table(&sql, "sessions") {
true => Some(store.dataset(Table::Sessions).await?),
false => None,
})
},
async {
anyhow::Ok(match pond::sql::mentions_table(&sql, "messages") {
true => Some(store.dataset(Table::Messages).await?),
false => None,
})
},
async {
anyhow::Ok(match pond::sql::mentions_table(&sql, "parts") {
true => Some(store.dataset(Table::Parts).await?),
false => None,
})
},
)?;
let tables = pond::sql::Tables {
sessions,
messages,
parts,
};
match pond::sql::run(&tables, &sql, mode, inline_rows).await {
Ok(pond::sql::Outcome::Inline(text)) => {
output(&text)?;
}
Ok(pond::sql::Outcome::Export {
bytes,
format: _,
rows,
columns: _,
}) => match output_file {
Some(path) => {
fs::write(&path, &bytes)
.with_context(|| format!("write export to {}", path.display()))?;
output_err(&format!(
"{} {} row(s), {} bytes -> {}",
pond::output::paint("export:", pond::output::dim()),
rows,
bytes.len(),
path.display()
))?;
}
None => {
use std::io::Write;
io::stdout().write_all(&bytes)?;
}
},
Err(pond::sql::SqlError::Query(message)) => {
output_err(&format!(
"{} {}",
pond::output::paint("sql error:", pond::output::dim()),
sql_error_for_cli(&message),
))?;
std::process::exit(2);
}
Err(pond::sql::SqlError::Infra(error)) => {
return Err(error);
}
}
}
}
Ok(())
}
fn init_tracing(cli_level: tracing::level_filters::LevelFilter) {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(format!(
"{cli_level},lance::index::vector::builder=error,aws_config=error,lance::object_store::throttle=error"
))
});
let indicatif_layer = IndicatifLayer::new();
let fmt_layer = fmt::layer().with_writer(indicatif_layer.get_stderr_writer());
tracing_subscriber::registry()
.with(filter)
.with(fmt_layer)
.with(indicatif_layer)
.init();
}
#[allow(clippy::print_stdout)]
fn output(message: &str) -> anyhow::Result<()> {
pond::output::line(message)
}
fn output_err(message: &str) -> anyhow::Result<()> {
pond::output::line_err(message)
}
async fn open_store_with_spinner(
location: &Url,
storage: HashMap<String, String>,
caps: pond::substrate::RuntimeCaps,
index_cache_dir: Option<PathBuf>,
) -> anyhow::Result<Store> {
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template("{spinner:.green} opening pond store... [{elapsed_precise}]")
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
spinner.enable_steady_tick(Duration::from_millis(120));
let result = Store::open_with_options_cached(location, storage, caps, index_cache_dir).await;
spinner.finish_and_clear();
result
}
fn runtime_caps(config: &Config) -> pond::substrate::RuntimeCaps {
pond::substrate::RuntimeCaps::from_config(&config.runtime)
}
fn resolve_storage_location(
explicit: Option<StorageUrl>,
loaded: &Config,
) -> anyhow::Result<StorageUrl> {
if let Some(storage) = explicit {
return Ok(storage);
}
if let Some(path) = &loaded.storage.path {
return StorageUrl::parse(path).context("invalid [storage].path in config");
}
default_local_storage()
}
async fn open_store(
explicit: Option<StorageUrl>,
loaded: &Config,
spinner: bool,
index_cache: bool,
) -> anyhow::Result<(ResolvedStorage, Store)> {
let storage = resolve_storage_location(explicit, loaded)?;
let resolved = storage.resolve(&loaded.creds)?;
warn_unmatched_sets(&[&resolved], loaded)?;
let index_cache_dir = index_cache.then(default_cache_dir);
let store = if spinner {
open_store_with_spinner(
resolved.lance_url(),
resolved.options.clone(),
runtime_caps(loaded),
index_cache_dir,
)
.await?
} else {
Store::open_with_options_cached(
resolved.lance_url(),
resolved.options.clone(),
runtime_caps(loaded),
index_cache_dir,
)
.await?
};
Ok((resolved, store))
}
fn warn_unmatched_sets(resolved: &[&ResolvedStorage], loaded: &Config) -> anyhow::Result<()> {
for name in pond::substrate::unmatched_creds_sets(resolved, &loaded.creds) {
output_err(&pond::output::paint(
&format!("hint creds set [{name}] matched no storage URL in this invocation"),
pond::output::dim(),
))?;
}
Ok(())
}
fn config_path(explicit: Option<PathBuf>) -> PathBuf {
if let Some(path) = explicit {
return path;
}
pond::config::default_config_path(
std::env::var_os("XDG_CONFIG_HOME").map(PathBuf::from),
std::env::var_os("HOME").map(PathBuf::from),
)
}
async fn run_config_command(
command: ConfigCmd,
storage_path: Option<StorageUrl>,
config: Option<PathBuf>,
) -> anyhow::Result<()> {
use pond::output::{dim, paint};
match command {
ConfigCmd::Schema => output(DEFAULT_CONFIG_TOML.trim_end()),
ConfigCmd::Path => output(&config_path(config).display().to_string()),
ConfigCmd::Show {} => {
let path = config_path(config);
let (loaded, figment) = Config::load_with_provenance(&path)?;
output(&format!(
"{} {}{}",
paint("config ", dim()),
pond::config::contract_home(&path).display(),
if path.exists() {
""
} else {
" (absent - defaults + env)"
},
))?;
let storage = resolve_storage_location(storage_path.clone(), &loaded)?;
let resolved = storage.resolve(&loaded.creds)?;
let storage_source = match &storage_path {
Some(_) => {
if std::env::args()
.any(|arg| arg == "--storage-path" || arg.starts_with("--storage-path="))
{
"cli"
} else {
"env"
}
}
None if loaded.storage.path.is_some() => classify_source(&figment, "storage.path"),
None => "default",
};
output(&format!(
"{} {} ({}) -> {}",
paint("storage", dim()),
resolved.display(),
storage_source,
describe_binding_with_source(&resolved.binding, &figment),
))?;
output(&paint(
"ladder cli flag > POND_* env > config file > ambient cloud chain > built-in defaults",
dim(),
))?;
output("")?;
let mut table = new_table();
table.set_header(
["setting", "value", "source"]
.map(|h| Cell::new(h).add_attributes(vec![Attribute::Dim, Attribute::Bold])),
);
let value = serde_json::to_value(&loaded).context("serialize config for display")?;
let mut rows = Vec::new();
flatten_config(String::new(), &value, &mut rows);
for (key, raw) in rows {
let source = classify_source(&figment, &key);
table.add_row([
key.clone(),
redact_config_value(&key, &raw),
source.to_owned(),
]);
}
output(&table.to_string())?;
warn_unmatched_sets(&[&resolved], &loaded)?;
Ok(())
}
}
}
fn check_failure_exit_code(failure: &CheckFailure) -> i32 {
match failure {
CheckFailure::NoCreds { .. } => 3,
CheckFailure::Auth { .. } => 4,
CheckFailure::OccUnsupported { .. } => 5,
CheckFailure::Io { .. } => 1,
}
}
fn check_failure_class(failure: &CheckFailure) -> &'static str {
match failure {
CheckFailure::NoCreds { .. } => "no_creds",
CheckFailure::Auth { .. } => "auth",
CheckFailure::OccUnsupported { .. } => "occ_unsupported",
CheckFailure::Io { .. } => "io",
}
}
fn storage_check_json(
url: &str,
binding: Option<&str>,
exit_code: i32,
failure: Option<(&str, String)>,
) -> anyhow::Result<String> {
let doc = serde_json::json!({
"url": url,
"ok": exit_code == 0,
"exit_code": exit_code,
"binding": binding,
"failure": failure.map(|(class, message)| serde_json::json!({
"class": class,
"message": message,
})),
});
serde_json::to_string_pretty(&doc).context("serialize storage check as JSON")
}
fn render_check_cause(failure: &CheckFailure) -> anyhow::Result<()> {
if let Some(cause) = failure.concise_cause() {
output_err(&pond::output::paint(
&format!("cause: {cause}"),
pond::output::dim(),
))?;
}
tracing::debug!("full probe failure: {failure:?}");
Ok(())
}
async fn run_storage_command(
command: StorageCmd,
storage_path: Option<StorageUrl>,
config: Option<PathBuf>,
) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let config_file = config_path(config);
let loaded = Config::load(&config_file)?;
match command {
StorageCmd::Check { url, format } => {
let storage = match url {
Some(raw) => match parse_storage_path(&raw) {
Ok(parsed) => parsed,
Err(error) => match format {
OutputFormat::Json => {
output(&storage_check_json(
&raw,
None,
2,
Some(("parse", format!("{error:#}"))),
)?)?;
std::process::exit(2);
}
OutputFormat::Text => {
output_err(&format!("check: parse error: {error:#}"))?;
std::process::exit(2);
}
},
},
None => resolve_storage_location(storage_path, &loaded)?,
};
let resolved = storage.resolve(&loaded.creds)?;
match format {
OutputFormat::Json => {
let url = resolved.display();
let binding = resolved.binding.describe();
match pond::substrate::storage_check(&resolved).await {
Ok(()) => output(&storage_check_json(&url, Some(&binding), 0, None)?)?,
Err(failure) => {
let code = check_failure_exit_code(&failure);
output(&storage_check_json(
&url,
Some(&binding),
code,
Some((check_failure_class(&failure), failure.to_string())),
)?)?;
std::process::exit(code);
}
}
}
OutputFormat::Text => {
output(&format!(
"{} {}",
resolved.display(),
paint(&format!("[{}]", resolved.binding.describe()), dim()),
))?;
warn_unmatched_sets(&[&resolved], &loaded)?;
match pond::substrate::storage_check(&resolved).await {
Ok(()) => {
output(
"check: ok - conditional put (OCC), read-back, and delete all succeeded",
)?;
}
Err(failure) => {
output_err(&format!("check: {failure}"))?;
render_check_cause(&failure)?;
std::process::exit(check_failure_exit_code(&failure));
}
}
}
}
}
StorageCmd::Use { url } => {
run_storage_use(&loaded, &config_file, storage_path, &url).await?;
}
}
Ok(())
}
#[allow(clippy::large_enum_variant)]
enum CopyEndpoint {
Store(StorageUrl),
Archive(PathBuf),
Jsonl(Option<PathBuf>),
}
fn describe_endpoint(endpoint: &CopyEndpoint) -> String {
match endpoint {
CopyEndpoint::Store(url) => url.display(),
CopyEndpoint::Archive(path) => format!("{} (archive)", path.display()),
CopyEndpoint::Jsonl(Some(path)) => format!("{} (jsonl)", path.display()),
CopyEndpoint::Jsonl(None) => "- (jsonl)".to_owned(),
}
}
fn sniff_copy_endpoint(raw: &str) -> anyhow::Result<CopyEndpoint> {
if raw == "-" {
return Ok(CopyEndpoint::Jsonl(None));
}
if raw.ends_with(".pond") {
return Ok(CopyEndpoint::Archive(PathBuf::from(raw)));
}
if raw.ends_with(".jsonl") {
return Ok(CopyEndpoint::Jsonl(Some(PathBuf::from(raw))));
}
Ok(CopyEndpoint::Store(parse_storage_path(raw)?))
}
fn resolve_copy_endpoint(
raw: &str,
storage_path: &Option<StorageUrl>,
loaded: &Config,
) -> anyhow::Result<CopyEndpoint> {
if raw == "@" {
return Ok(CopyEndpoint::Store(resolve_storage_location(
storage_path.clone(),
loaded,
)?));
}
sniff_copy_endpoint(raw)
}
async fn run_copy(
from: String,
to: String,
verify_only: bool,
storage_path: Option<StorageUrl>,
config: Option<PathBuf>,
) -> anyhow::Result<()> {
let loaded = Config::load(config_path(config))?;
let from_ep = resolve_copy_endpoint(&from, &storage_path, &loaded)?;
let to_ep = resolve_copy_endpoint(&to, &storage_path, &loaded)?;
if let CopyEndpoint::Store(url) = &from_ep
&& let Some(path) = pond::config::local_path(url.canonical())
&& !path.exists()
{
bail!(
"source store {} does not exist; check --from",
url.display()
);
}
match (from_ep, to_ep) {
(CopyEndpoint::Store(from), CopyEndpoint::Store(to)) => {
run_store_to_store_copy(from, to, verify_only, &loaded).await
}
_ if verify_only => bail!("--verify-only applies to store-to-store copies only"),
(CopyEndpoint::Store(from), CopyEndpoint::Archive(path)) => {
copy_store_to_archive(from, &path, &loaded).await
}
(CopyEndpoint::Store(from), CopyEndpoint::Jsonl(path)) => {
copy_store_to_jsonl(from, path, &loaded).await
}
(CopyEndpoint::Archive(path), CopyEndpoint::Store(to)) => {
copy_archive_to_store(&path, to, &loaded).await
}
(CopyEndpoint::Jsonl(_), CopyEndpoint::Store(_)) => bail!(
"jsonl is an export-only target: restore from a `.pond` archive or copy from a store URL instead"
),
(from_ep, to_ep) => bail!(
"unsupported copy {} -> {}: at least one endpoint must be a pond store",
describe_endpoint(&from_ep),
describe_endpoint(&to_ep),
),
}
}
async fn run_store_to_store_copy(
from: StorageUrl,
to: StorageUrl,
verify_only: bool,
loaded: &Config,
) -> anyhow::Result<()> {
if from.canonical() == to.canonical() {
bail!(
"--from and --to resolve to the same store ({}); nothing to copy",
from.display(),
);
}
let (from_resolved, from_store, to_resolved, to_store) =
resolve_and_open_pair(&from, &to, loaded).await?;
if verify_only {
let verify = verify_stores(&from_store, &to_store).await?;
ensure_source_not_empty(&verify, &from_resolved.display())?;
if !render_storage_verify(&verify, &from_resolved.display(), &to_resolved.display())?.synced
{
std::process::exit(6);
}
return Ok(());
}
let cmd_started = std::time::Instant::now();
let dim = pond::output::dim();
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template("{spinner:.green} {msg} [{elapsed_precise}]")
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
spinner.enable_steady_tick(Duration::from_millis(120));
spinner.set_message("plan comparing source <-> destination");
let plan_started = std::time::Instant::now();
let plan = to_store.plan_incremental_from(&from_store).await?;
let plan_elapsed = plan_started.elapsed();
if plan.source_sessions == 0 {
spinner.finish_and_clear();
bail!(
"source store {} has 0 sessions; check --from (a mistyped source opens as an empty store)",
from_resolved.display(),
);
}
let new_sessions = plan.new_sessions();
let grown_sessions = plan.total().saturating_sub(new_sessions);
spinner.println(stage_line(
plan_elapsed,
"plan",
&format!(
"{} sessions to copy ({} new + {} grown, {} on source)",
plan.total(),
new_sessions,
grown_sessions,
plan.source_sessions,
),
));
if plan.is_empty() {
spinner.println(format!(
"{} destination already up to date (0 sessions copied)",
pond::output::paint("copy:", dim),
));
} else {
spinner.set_message(format!(
"stream copying {} sessions to destination",
plan.total(),
));
let stream_started = std::time::Instant::now();
let imported = to_store.copy_delta_from(&from_store, &plan).await?;
let stream_elapsed = stream_started.elapsed();
spinner.println(stage_line(
stream_elapsed,
"stream",
&format!(
"sessions {} ({} new) messages {} ({} new) parts {} ({} new)",
imported.rows.sessions,
imported.inserted.sessions,
imported.rows.messages,
imported.inserted.messages,
imported.rows.parts,
imported.inserted.parts,
),
));
}
spinner.finish_and_clear();
let indexes_started = std::time::Instant::now();
let policy = configured_maintenance_policy(loaded, None)?;
finalize_indexes(&to_store, &policy, false).await?;
output(&stage_line(
indexes_started.elapsed(),
"indexes",
"text + semantic rebuilt on destination",
))?;
let verify_started = std::time::Instant::now();
let verify = verify_stores(&from_store, &to_store).await?;
let verify_elapsed = verify_started.elapsed();
if verify.synced() {
output(&stage_line(
verify_elapsed,
"verify",
&format!(
"SYNCED - {} source rows, {} duplicates",
format_thousands(verify.total_source_rows() as u64),
verify.total_duplicates(),
),
))?;
} else {
output(&stage_line(verify_elapsed, "verify", "FAILED"))?;
render_storage_verify(&verify, &from_resolved.display(), &to_resolved.display())?;
std::process::exit(6);
}
output(&format!(
"{} copy complete in {}",
pond::output::paint("done -", dim),
elapsed_hms(cmd_started.elapsed()),
))?;
Ok(())
}
async fn copy_store_to_archive(
from: StorageUrl,
path: &Path,
loaded: &Config,
) -> anyhow::Result<()> {
let (_, store) = open_store(Some(from), loaded, false, false).await?;
let summary = export_pond_archive(&store, path).await?;
output(&format!(
"{} {} sessions={} messages={} parts={}",
pond::output::paint("copy:", pond::output::dim()),
path.display(),
summary.rows.sessions,
summary.rows.messages,
summary.rows.parts,
))?;
Ok(())
}
async fn copy_store_to_jsonl(
from: StorageUrl,
path: Option<PathBuf>,
loaded: &Config,
) -> anyhow::Result<()> {
let (_, store) = open_store(Some(from), loaded, false, false).await?;
let to_stdout = path.is_none();
let summary = match path {
Some(path) => {
let file = tokio::fs::File::create(&path)
.await
.with_context(|| format!("failed to open {}", path.display()))?;
let mut writer = tokio::io::BufWriter::new(file);
let summary = handlers::pond_export(&store, None, &mut writer).await?;
writer.flush().await.context("copy: flush")?;
summary
}
None => {
let mut stdout = tokio::io::stdout();
handlers::pond_export(&store, None, &mut stdout).await?
}
};
let line = format!(
"{} jsonl sessions={} messages={} parts={}",
pond::output::paint("copy:", pond::output::dim()),
summary.sessions,
summary.messages,
summary.parts,
);
if to_stdout {
output_err(&line)?;
} else {
output(&line)?;
}
Ok(())
}
async fn copy_archive_to_store(path: &Path, to: StorageUrl, loaded: &Config) -> anyhow::Result<()> {
let cmd_started = std::time::Instant::now();
let (_, store) = open_store(Some(to), loaded, false, false).await?;
let summary = import_pond_archive(&store, path).await?;
render_copy_import(&summary)?;
let dim = pond::output::dim();
let policy = configured_maintenance_policy(loaded, None)?;
let indexes_started = std::time::Instant::now();
finalize_indexes(&store, &policy, false).await?;
output(&stage_line(
indexes_started.elapsed(),
"indexes",
"text + semantic rebuilt on destination",
))?;
output(&format!(
"{} restore complete in {}",
pond::output::paint("done -", dim),
elapsed_hms(cmd_started.elapsed()),
))?;
Ok(())
}
fn render_copy_import(import: &LanceArchiveImport) -> anyhow::Result<()> {
output(&format!(
"{} sessions={} messages={} parts={} inserted_sessions={} inserted_messages={} inserted_parts={}",
pond::output::paint("copy:", pond::output::dim()),
import.rows.sessions,
import.rows.messages,
import.rows.parts,
import.inserted.sessions,
import.inserted.messages,
import.inserted.parts,
))
}
fn storage_config_value(url: &StorageUrl) -> String {
let display = url.display();
if url.is_local() && display.len() > 1 && display.ends_with('/') {
display.trim_end_matches('/').to_owned()
} else {
display
}
}
fn set_storage_path(doc: &mut toml_edit::DocumentMut, path_value: &str) {
use toml_edit::{Item, Table, value};
match doc.get_mut("storage").and_then(Item::as_table_like_mut) {
Some(storage) => {
storage.insert("path", value(path_value));
}
None => {
let mut storage = Table::new();
storage.insert("path", value(path_value));
doc.insert("storage", Item::Table(storage));
}
}
}
async fn run_storage_use(
loaded: &Config,
config_file: &Path,
storage_path: Option<StorageUrl>,
url: &str,
) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let dest = match parse_storage_path(url) {
Ok(parsed) => parsed,
Err(error) => {
output_err(&format!("use: parse error: {error:#}"))?;
std::process::exit(2);
}
};
let dest_resolved = dest.resolve(&loaded.creds)?;
output(&format!(
"{} {}",
dest_resolved.display(),
paint(&format!("[{}]", dest_resolved.binding.describe()), dim()),
))?;
warn_unmatched_sets(&[&dest_resolved], loaded)?;
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template("{spinner:.green} probing destination... [{elapsed_precise}]")
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
spinner.enable_steady_tick(Duration::from_millis(120));
let check = pond::substrate::storage_check(&dest_resolved).await;
spinner.finish_and_clear();
if let Err(failure) = check {
output_err(&format!(
"use: destination failed the end-to-end check; config not changed: {failure}"
))?;
render_check_cause(&failure)?;
std::process::exit(check_failure_exit_code(&failure));
}
output("check: ok - conditional put (OCC), read-back, and delete all succeeded")?;
let current = resolve_storage_location(storage_path, loaded)?;
let already_configured = loaded.storage.path.as_deref() == Some(dest.canonical().as_str())
|| loaded.storage.path.as_deref().is_some_and(|path| {
StorageUrl::parse(path)
.map(|parsed| parsed.canonical() == dest.canonical())
.unwrap_or(false)
});
if current.canonical() == dest.canonical() && already_configured {
output(&format!(
"{} {} is already the configured destination - nothing to change",
paint("use:", dim()),
dest.display(),
))?;
return Ok(());
}
let mut doc = read_config_doc(config_file)?;
let path_value = storage_config_value(&dest);
set_storage_path(&mut doc, &path_value);
write_config_doc(config_file, &doc)?;
output(&format!(
"{} [storage].path = {} written to {}",
paint("use:", dim()),
path_value,
config::display(&config::url_for_path(config_file)?),
))?;
output(&format!(
"{} previous data at {} is untouched; copy it over with `pond copy --from {} --to {}`",
paint("hint", dim()),
current.display(),
current.display(),
dest.display(),
))?;
Ok(())
}
fn read_config_doc(config_file: &Path) -> anyhow::Result<toml_edit::DocumentMut> {
let existing = if config_file.exists() {
fs::read_to_string(config_file)
.with_context(|| format!("failed to read {}", config_file.display()))?
} else {
String::new()
};
existing
.parse()
.with_context(|| format!("failed to parse {} as TOML", config_file.display()))
}
fn write_config_doc(config_file: &Path, doc: &toml_edit::DocumentMut) -> anyhow::Result<()> {
if let Some(parent) = config_file.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
pond::config::write_config_file(config_file, &doc.to_string())
}
fn run_creds(command: CredsCmd, config_file: &Path, loaded: &Config) -> anyhow::Result<()> {
match command {
CredsCmd::Add {
name,
scope,
region,
} => creds_add(config_file, name, scope, region),
CredsCmd::List { format } => creds_list(loaded, format),
CredsCmd::Delete { name } => creds_delete(config_file, name),
}
}
fn creds_add(
config_file: &Path,
name: Option<String>,
scope: Option<String>,
region: Option<String>,
) -> anyhow::Result<()> {
use pond::output::{dim, paint};
if !io::stdin().is_terminal() {
bail!(
"`pond creds add` needs a terminal for hidden secret entry; in non-interactive runs set POND_CREDS_<NAME>_ACCESS_KEY_ID / POND_CREDS_<NAME>_SECRET_ACCESS_KEY or add [creds.<name>] to config.toml"
);
}
let name = match name {
Some(name) => name,
None => cliclack::input("Credential set name")
.default_input("default")
.interact()
.context("credential prompt failed; nothing written")?,
};
if !config::valid_creds_set_name(&name) {
bail!(config::creds_set_name_error(&name));
}
let mut doc = read_config_doc(config_file)?;
let exists = doc
.get("creds")
.and_then(toml_edit::Item::as_table_like)
.is_some_and(|creds| creds.contains_key(&name));
if exists
&& !cliclack::confirm(format!("Replace existing credential set [creds.{name}]?"))
.initial_value(false)
.interact()
.context("credential prompt failed; nothing written")?
{
output(&format!(
"{} kept existing [creds.{}]; nothing written",
paint("creds:", dim()),
name,
))?;
return Ok(());
}
let access_key_id: String = cliclack::input("Access key ID")
.interact()
.context("credential prompt failed; nothing written")?;
let secret_access_key: String = cliclack::password("Secret access key")
.mask('*')
.interact()
.context("credential prompt failed; nothing written")?;
set_creds_set(
&mut doc,
&name,
&access_key_id,
&secret_access_key,
scope.as_deref(),
region.as_deref(),
);
Config::load_str(&doc.to_string())
.context("the resulting config would be invalid; nothing written")?;
write_config_doc(config_file, &doc)?;
output(&format!(
"{} [creds.{}] written to {} (secret redacted)",
paint("creds:", dim()),
name,
config::display(&config::url_for_path(config_file)?),
))?;
output(&format!(
"{} verify it binds with `pond storage check <url>`",
paint("hint", dim()),
))?;
Ok(())
}
fn creds_display_fields(set: &config::CredsSet) -> (String, String) {
let access = match (&set.access_key_id, &set.access_key_id_file) {
(Some(_), _) => "********".to_owned(),
(None, Some(path)) => format!("file:{}", path.display()),
(None, None) => "-".to_owned(),
};
let secret = if set.secret_access_key.is_some() {
"********".to_owned()
} else if let Some(path) = &set.secret_access_key_file {
format!("file:{}", path.display())
} else if let Some(command) = &set.secret_access_key_command {
format!("command:{command}")
} else {
"-".to_owned()
};
(access, secret)
}
fn creds_list_json(loaded: &Config) -> anyhow::Result<String> {
let sets: Vec<serde_json::Value> = loaded
.creds
.iter()
.map(|(name, set)| {
let (access, secret) = creds_display_fields(set);
serde_json::json!({
"name": name,
"scope": set.scope,
"access_key": access,
"secret": secret,
"region": set.region,
})
})
.collect();
serde_json::to_string_pretty(&sets).context("serialize creds list as JSON")
}
fn creds_list(loaded: &Config, format: OutputFormat) -> anyhow::Result<()> {
if let OutputFormat::Json = format {
output(&creds_list_json(loaded)?)?;
return Ok(());
}
if loaded.creds.is_empty() {
output(
"no credential sets configured - add one with `pond creds add`, or set POND_CREDS_<NAME>_* in the environment",
)?;
return Ok(());
}
let mut table = new_table();
table.set_header(vec!["set", "scope", "access key", "secret", "region"]);
for (name, set) in &loaded.creds {
let scope = set
.scope
.clone()
.unwrap_or_else(|| "(catch-all)".to_owned());
let (access, secret) = creds_display_fields(set);
let region = set.region.clone().unwrap_or_else(|| "-".to_owned());
table.add_row(vec![name.clone(), scope, access, secret, region]);
}
output(&table.to_string())?;
Ok(())
}
fn creds_delete(config_file: &Path, name: String) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let mut doc = read_config_doc(config_file)?;
let removed = doc
.get_mut("creds")
.and_then(toml_edit::Item::as_table_like_mut)
.is_some_and(|creds| creds.remove(&name).is_some());
if !removed {
bail!(
"no [creds.{name}] set in {}",
config::display(&config::url_for_path(config_file)?)
);
}
if doc
.get("creds")
.and_then(toml_edit::Item::as_table_like)
.is_some_and(toml_edit::TableLike::is_empty)
{
doc.remove("creds");
}
write_config_doc(config_file, &doc)?;
output(&format!(
"{} [creds.{}] removed",
paint("creds:", dim()),
name
))?;
if let Ok(reloaded) = Config::load(config_file)
&& let Some(path) = reloaded.storage.path.as_deref()
&& let Ok(url) = StorageUrl::parse(path)
&& let Ok(resolved) = url.resolve(&reloaded.creds)
&& matches!(resolved.binding, CredsBinding::Ambient)
{
output(&format!(
"{} active storage {} now matches no creds set; it will fall back to the ambient cloud chain",
paint("note:", dim()),
url.display(),
))?;
}
Ok(())
}
fn set_creds_set(
doc: &mut toml_edit::DocumentMut,
name: &str,
access_key_id: &str,
secret_access_key: &str,
scope: Option<&str>,
region: Option<&str>,
) {
use toml_edit::{Item, Table, value};
let mut set = Table::new();
if let Some(scope) = scope {
set.insert("scope", value(scope));
}
set.insert("access_key_id", value(access_key_id));
set.insert("secret_access_key", value(secret_access_key));
if let Some(region) = region {
set.insert("region", value(region));
}
match doc.get_mut("creds").and_then(Item::as_table_like_mut) {
Some(creds) => {
creds.insert(name, Item::Table(set));
}
None => {
let mut parent = Table::new();
parent.set_implicit(true);
parent.insert(name, Item::Table(set));
doc.insert("creds", Item::Table(parent));
}
}
}
fn run_adapters(command: AdaptersCmd, config_file: &Path, loaded: &Config) -> anyhow::Result<()> {
match command {
AdaptersCmd::List { format } => adapters_list(loaded, format),
AdaptersCmd::Discover => adapters_discover(config_file),
AdaptersCmd::Enable { name } => adapters_enable(config_file, &name),
AdaptersCmd::Disable { name } => adapters_disable(config_file, &name),
}
}
fn adapters_list_json(loaded: &Config) -> anyhow::Result<String> {
let detected = adapter::probe_unconfigured(&loaded.adapters);
let configured: Vec<serde_json::Value> = loaded
.adapters
.iter()
.map(|(name, blob)| {
let enabled = blob
.get("enabled")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let path = blob
.get("path")
.and_then(serde_json::Value::as_str)
.map(|p| config::contract_home(Path::new(p)).display().to_string());
serde_json::json!({ "name": name, "enabled": enabled, "path": path })
})
.collect();
let detected_json: Vec<serde_json::Value> = detected
.iter()
.map(|c| serde_json::json!({ "name": c.name, "hint": c.hint }))
.collect();
let doc = serde_json::json!({ "configured": configured, "detected": detected_json });
serde_json::to_string_pretty(&doc).context("serialize adapters list as JSON")
}
fn adapters_list(loaded: &Config, format: OutputFormat) -> anyhow::Result<()> {
if let OutputFormat::Json = format {
output(&adapters_list_json(loaded)?)?;
return Ok(());
}
let detected = adapter::probe_unconfigured(&loaded.adapters);
if loaded.adapters.is_empty() && detected.is_empty() {
output(
"no adapters configured and none detected - run `pond adapters discover` (or `pond init`)",
)?;
return Ok(());
}
let mut table = new_table();
table.set_header(vec!["adapter", "enabled", "path"]);
for (name, blob) in &loaded.adapters {
let enabled = blob
.get("enabled")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let path = blob
.get("path")
.and_then(serde_json::Value::as_str)
.map(|p| config::contract_home(Path::new(p)).display().to_string())
.unwrap_or_else(|| "-".to_owned());
let state = if enabled { "yes" } else { "no" }.to_owned();
table.add_row(vec![name.clone(), state, path]);
}
for candidate in &detected {
table.add_row(vec![
candidate.name.clone(),
"detected".to_owned(),
candidate.hint.clone(),
]);
}
output(&table.to_string())?;
Ok(())
}
fn adapters_discover(config_file: &Path) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let candidates = adapter::discover(None);
let picks = adapter::prompt_and_persist(config_file, &candidates, io::stdin().is_terminal())?;
let names: Vec<&str> = picks.iter().map(|c| c.name.as_str()).collect();
output(&format!(
"{} enabled {} adapter(s): {}",
paint("adapters:", dim()),
picks.len(),
names.join(", "),
))?;
Ok(())
}
fn adapters_enable(config_file: &Path, name: &str) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let known = adapter::known_names();
if !known.contains(&name) {
bail!("unknown adapter {name:?}; known: {}", known.join(", "));
}
if adapter::set_adapter_enabled(config_file, name, true)? {
output(&format!(
"{} [adapters.{}] enabled = true",
paint("adapters:", dim()),
name,
))?;
return Ok(());
}
let candidate = adapter::discover(Some(name))
.into_iter()
.find(|c| c.name == name)
.ok_or_else(|| {
anyhow::anyhow!(
"adapter {name:?} was not detected on this machine; add an [adapters.{name}] entry with its path manually, or run `pond sync {name} --path <path>` for a one-off"
)
})?;
adapter::persist_accept(config_file, &[candidate])?;
output(&format!(
"{} [adapters.{}] enabled = true (discovered)",
paint("adapters:", dim()),
name,
))?;
Ok(())
}
fn adapters_disable(config_file: &Path, name: &str) -> anyhow::Result<()> {
use pond::output::{dim, paint};
if adapter::set_adapter_enabled(config_file, name, false)? {
output(&format!(
"{} [adapters.{}] enabled = false",
paint("adapters:", dim()),
name,
))?;
Ok(())
} else {
bail!(
"no [adapters.{name}] entry to disable in {}",
config_file.display()
);
}
}
fn classify_source(figment: &figment::Figment, key: &str) -> &'static str {
match figment.find_metadata(key) {
Some(md) if md.name.contains("POND_") => "env",
Some(_) => "file",
None => "default",
}
}
fn describe_binding_with_source(binding: &CredsBinding, figment: &figment::Figment) -> String {
match binding {
CredsBinding::Set { name, .. } => {
let source = classify_source(figment, &format!("creds.{name}"));
let base = binding.describe();
format!("{}, {source})", base.trim_end_matches(')'))
}
other => other.describe(),
}
}
fn flatten_config(prefix: String, value: &Value, rows: &mut Vec<(String, String)>) {
match value {
Value::Null => {}
Value::Object(map) => {
for (key, child) in map {
let path = if prefix.is_empty() {
key.clone()
} else {
format!("{prefix}.{key}")
};
flatten_config(path, child, rows);
}
}
Value::String(text) => rows.push((prefix, text.clone())),
other => rows.push((prefix, other.to_string())),
}
}
fn redact_config_value(key: &str, value: &str) -> String {
let field = key.rsplit('.').next().unwrap_or(key).to_ascii_lowercase();
if field.ends_with("_file") || field.ends_with("_command") {
return value.to_owned();
}
let sensitive = ["key", "secret", "token", "password"];
if sensitive.iter().any(|needle| field.contains(needle)) {
return "********".to_owned();
}
value.to_owned()
}
async fn resolve_and_open_pair(
from: &StorageUrl,
to: &StorageUrl,
loaded: &Config,
) -> anyhow::Result<(ResolvedStorage, Store, ResolvedStorage, Store)> {
let from_resolved = from.resolve(&loaded.creds)?;
let to_resolved = to.resolve(&loaded.creds)?;
warn_unmatched_sets(&[&from_resolved, &to_resolved], loaded)?;
let dim = pond::output::dim();
output(&format!(
"{} {} {}",
pond::output::paint("from:", dim),
from_resolved.display(),
pond::output::paint(&format!("[{}]", from_resolved.binding.describe()), dim),
))?;
output(&format!(
"{} {} {}",
pond::output::paint("to:", dim),
to_resolved.display(),
pond::output::paint(&format!("[{}]", to_resolved.binding.describe()), dim),
))?;
let from_store = Store::open_with_options(
from_resolved.lance_url(),
from_resolved.options.clone(),
runtime_caps(loaded),
)
.await?;
let to_store = Store::open_with_options(
to_resolved.lance_url(),
to_resolved.options.clone(),
runtime_caps(loaded),
)
.await?;
Ok((from_resolved, from_store, to_resolved, to_store))
}
struct TableVerify {
table: substrate::Table,
source_rows: usize,
missing: usize,
dest_duplicates: usize,
}
struct StorageVerify {
tables: Vec<TableVerify>,
dest_indexes: IndexCoverage,
}
impl StorageVerify {
fn synced(&self) -> bool {
self.tables
.iter()
.all(|t| t.missing == 0 && t.dest_duplicates == 0)
}
fn total_missing(&self) -> usize {
self.tables.iter().map(|t| t.missing).sum()
}
fn total_duplicates(&self) -> usize {
self.tables.iter().map(|t| t.dest_duplicates).sum()
}
fn total_source_rows(&self) -> usize {
self.tables.iter().map(|t| t.source_rows).sum()
}
}
#[derive(Debug, Clone, Copy)]
struct IndexCoverage {
fts_present: bool,
vector_present_or_below_activation: bool,
}
impl IndexCoverage {
fn ready(self) -> bool {
self.fts_present && self.vector_present_or_below_activation
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct VerifyOutcome {
synced: bool,
}
async fn verify_stores(from: &Store, to: &Store) -> anyhow::Result<StorageVerify> {
use substrate::Table;
let verify_table = |table: Table| async move {
let (dest_keys, dest_rows) = to.composite_pk_index(table).await?;
let dest_duplicates = dest_rows - dest_keys.len();
let (source_rows, missing) = from.composite_pk_diff_against(table, &dest_keys).await?;
anyhow::Ok(TableVerify {
table,
source_rows,
missing,
dest_duplicates,
})
};
let (sessions, messages, parts, dest_index_status, dest_embedding) = tokio::try_join!(
verify_table(Table::Sessions),
verify_table(Table::Messages),
verify_table(Table::Parts),
to.index_status(),
to.embedding_progress(),
)?;
let tables = vec![sessions, messages, parts];
let fts_present = dest_index_status
.iter()
.any(|status| status.intent_name == MESSAGES_FTS_INDEX && status.exists);
let vector_present = dest_index_status
.iter()
.any(|status| status.intent_name == MESSAGES_VECTOR_INDEX && status.exists);
let dest_indexes = IndexCoverage {
fts_present,
vector_present_or_below_activation: vector_present
|| dest_embedding.embedded < VECTOR_INDEX_ACTIVATION_ROWS,
};
Ok(StorageVerify {
tables,
dest_indexes,
})
}
fn ensure_source_not_empty(verify: &StorageVerify, from_display: &str) -> anyhow::Result<()> {
if verify.total_source_rows() == 0 {
bail!(
"source store {from_display} has 0 rows; check --from (a mistyped source opens as an empty store)"
);
}
Ok(())
}
fn render_storage_verify(
verify: &StorageVerify,
from_display: &str,
to_display: &str,
) -> anyhow::Result<VerifyOutcome> {
use pond::output::{dim, paint};
if verify.synced() {
let detail = verify
.tables
.iter()
.map(|t| {
format!(
"{} {}",
t.table.as_str(),
format_thousands(t.source_rows as u64)
)
})
.collect::<Vec<_>>()
.join(", ");
output(&format!(
"{} SYNCED - destination contains all {} source rows ({})",
paint("verify:", dim()),
format_thousands(verify.total_source_rows() as u64),
detail,
))?;
if !verify.dest_indexes.ready() {
output(&format!(
"{} data copied; indexes pending - run `pond optimize --only index --storage-path {to_display}` before querying",
paint("verify:", dim()),
))?;
}
Ok(VerifyOutcome { synced: true })
} else {
if verify.total_missing() > 0 {
output_err(&format!(
"{} FAILED - {} source rows missing from destination ({})",
paint("verify:", dim()),
format_thousands(verify.total_missing() as u64),
table_breakdown(verify, |t| t.missing),
))?;
output_err(&paint(
&format!(
" fix: re-run `pond copy --from {from_display} --to {to_display}` (idempotent; copies only the missing rows)"
),
dim(),
))?;
}
if verify.total_duplicates() > 0 {
output_err(&format!(
"{} FAILED - {} duplicate rows on destination ({}) - a write anomaly, not a copy gap; investigate before querying (re-copy will not remove them)",
paint("verify:", dim()),
format_thousands(verify.total_duplicates() as u64),
table_breakdown(verify, |t| t.dest_duplicates),
))?;
}
Ok(VerifyOutcome { synced: false })
}
}
fn table_breakdown(verify: &StorageVerify, count: impl Fn(&TableVerify) -> usize) -> String {
verify
.tables
.iter()
.filter(|t| count(t) > 0)
.map(|t| format!("{} {}", t.table.as_str(), format_thousands(count(t) as u64)))
.collect::<Vec<_>>()
.join(", ")
}
#[derive(Debug, Default)]
struct OptimizeStages {
embed: bool,
index: bool,
}
impl OptimizeStages {
fn resolve(only: Option<OptimizeStage>, skip: &[OptimizeStage]) -> anyhow::Result<Self> {
let mut stages = match only {
Some(OptimizeStage::Embed) => Self {
embed: true,
..Self::default()
},
Some(OptimizeStage::Index) => Self {
index: true,
..Self::default()
},
None => Self {
embed: true,
index: true,
},
};
for stage in skip {
match stage {
OptimizeStage::Embed => stages.embed = false,
OptimizeStage::Index => stages.index = false,
}
}
if !(stages.embed || stages.index) {
bail!("no optimize stages selected");
}
Ok(stages)
}
}
pub(crate) struct SyncInvocation {
pub adapter: Option<String>,
pub path: Option<PathBuf>,
pub verify: bool,
pub dry_run: bool,
pub no_wait: bool,
pub format: OutputFormat,
}
impl SyncInvocation {
pub(crate) fn defaults() -> Self {
Self {
adapter: None,
path: None,
verify: false,
dry_run: false,
no_wait: false,
format: OutputFormat::Text,
}
}
}
#[derive(Default)]
struct SyncReport {
sessions_inserted: u64,
messages_inserted: u64,
indexes_folded: bool,
stored_sessions: Option<u64>,
stored_messages: Option<u64>,
}
pub(crate) async fn run_sync(
loaded: &Config,
config_file: &Path,
storage_path: Option<StorageUrl>,
invocation: SyncInvocation,
) -> anyhow::Result<()> {
let json = matches!(invocation.format, OutputFormat::Json);
if invocation.dry_run {
let outcome = run_sync_dry_run(loaded, config_file, storage_path, &invocation).await;
if json && let Err(error) = &outcome {
emit_json_error(error)?;
}
return outcome;
}
let cmd_started = std::time::Instant::now();
let setup = (|| -> anyhow::Result<(StorageUrl, String)> {
let storage = resolve_storage_location(storage_path, loaded)?;
let store_key = pond::substrate::store_key(storage.resolve(&loaded.creds)?.lance_url());
Ok((storage, store_key))
})();
let (storage, store_key) = match setup {
Ok(setup) => setup,
Err(error) => {
if json {
emit_json_error(&error)?;
}
return Err(error);
}
};
let lock_state = match syncstate::try_acquire_sync_lock(&store_key) {
Ok(state) => state,
Err(error) => {
if json {
emit_json_error(&error)?;
}
return Err(error);
}
};
let _lock = match lock_state {
syncstate::SyncLockState::Acquired(guard) => guard,
syncstate::SyncLockState::Busy(holder) => {
if invocation.no_wait {
if json {
output(&serde_json::to_string_pretty(&serde_json::json!({
"outcome": "skipped",
"reason": "another pond sync is already running against this store",
"holder_pid": holder.as_ref().map(|holder| holder.pid),
}))?)?;
} else {
output(&format!(
"sync skipped: another pond sync is already running ({})",
describe_lock_holder(holder.as_ref()),
))?;
}
return Ok(());
}
match wait_for_sync_lock(&store_key, holder).await {
Ok(guard) => guard,
Err(error) => {
if json {
emit_json_error(&error)?;
}
return Err(error);
}
}
}
};
let mut report = SyncReport::default();
let outcome = run_sync_stages(
loaded,
config_file,
Some(storage),
&invocation,
json,
&mut report,
)
.await;
let duration = cmd_started.elapsed();
syncstate::write_last_sync(
&store_key,
&syncstate::LastSyncRecord {
finished_at: Utc::now(),
duration_secs: duration.as_secs_f64(),
sessions_inserted: report.sessions_inserted,
messages_inserted: report.messages_inserted,
outcome: if outcome.is_ok() {
syncstate::SyncOutcome::Ok
} else {
syncstate::SyncOutcome::Error
},
error: outcome.as_ref().err().map(|error| format!("{error:#}")),
},
);
if json && let Err(error) = &outcome {
output(&serde_json::to_string_pretty(&serde_json::json!({
"outcome": "error",
"error": format!("{error:#}"),
"sessions_inserted": report.sessions_inserted,
"messages_inserted": report.messages_inserted,
"duration_secs": duration.as_secs_f64(),
}))?)?;
}
outcome?;
if json {
output(&serde_json::to_string_pretty(&serde_json::json!({
"outcome": "ok",
"sessions_inserted": report.sessions_inserted,
"messages_inserted": report.messages_inserted,
"indexes_folded": report.indexes_folded,
"stored": {
"sessions": report.stored_sessions,
"messages": report.stored_messages,
},
"duration_secs": duration.as_secs_f64(),
}))?)?;
} else {
output(&format!(
"{} sync complete in {}",
pond::output::paint("done -", pond::output::dim()),
elapsed_hms(duration),
))?;
}
Ok(())
}
async fn run_sync_stages(
loaded: &Config,
config_file: &Path,
storage_path: Option<StorageUrl>,
invocation: &SyncInvocation,
json: bool,
report: &mut SyncReport,
) -> anyhow::Result<()> {
let open_started = std::time::Instant::now();
let (_, store) = open_store(storage_path, loaded, true, false).await?;
let embedder = Arc::new(LazyEmbedder::candle());
let flush_hud = Arc::new(FlushHud::default());
let store = store
.with_embedder(embedder.clone())
.with_ingest_embed_progress(pond::sessions::IngestEmbedProgress(Arc::new({
let hud = flush_hud.clone();
move |done, total| hud.embed_tick(done, total)
})));
tracing::debug!(target: "pond::perf", stage = "open_store", elapsed_ms = open_started.elapsed().as_millis() as u64, "sync stage");
let model_started = std::time::Instant::now();
match embedder.get().await {
Ok(_) => {
if !json && model_started.elapsed() > Duration::from_secs(1) {
output(&stage_line(
model_started.elapsed(),
"model",
"embedding model ready",
))?;
}
}
Err(error) => {
output_err(&pond::output::paint(
&format!(
"model: embedding model unavailable ({error:#}); continuing - the sync fails only if new sessions need embedding"
),
pond::output::yellow(),
))?;
}
}
let import_started = std::time::Instant::now();
let import_summary = run_import_stage(
&store,
loaded,
config_file,
invocation.adapter.clone(),
invocation.path.clone(),
invocation.verify,
&flush_hud,
)
.await?;
report.sessions_inserted = import_summary.sessions_inserted as u64;
report.messages_inserted = import_summary.messages_inserted_searchable as u64;
let any_new_rows = import_summary.inserted > 0;
if !json {
output(&stage_line(
import_started.elapsed(),
"import",
&format!(
"+{} sessions, +{} searchable messages",
format_thousands(import_summary.sessions_inserted as u64),
format_thousands(import_summary.messages_inserted_searchable as u64),
),
))?;
}
if any_new_rows {
if let Err(error) = store.ensure_rowmap(&default_cache_dir()).await {
tracing::warn!(%error, "post-import rowmap refresh skipped");
}
guard_embedding_model_unchanged(&store).await?;
let policy = configured_maintenance_policy(loaded, None)?
.with_cleanup_interval(pond::substrate::DEFAULT_SYNC_CLEANUP_INTERVAL)
.with_scalar_fold_row_threshold(pond::substrate::DEFAULT_SYNC_SCALAR_FOLD_ROWS)
.with_index_fold_row_threshold(pond::substrate::DEFAULT_SYNC_INDEX_FOLD_ROWS);
let indexes_started = std::time::Instant::now();
run_update_indexes_stage(&store, &policy).await?;
report.indexes_folded = true;
if !json {
output(&stage_line(
indexes_started.elapsed(),
"indexes",
"fold complete",
))?;
}
}
let summary_started = std::time::Instant::now();
if json {
let (sessions, messages, _) = store.row_counts().await?;
report.stored_sessions = Some(sessions as u64);
report.stored_messages = Some(messages as u64);
} else {
render_sync_summary(&store).await?;
}
tracing::debug!(target: "pond::perf", stage = "render_summary", elapsed_ms = summary_started.elapsed().as_millis() as u64, "sync stage");
Ok(())
}
async fn run_sync_dry_run(
loaded: &Config,
config_file: &Path,
storage_path: Option<StorageUrl>,
invocation: &SyncInvocation,
) -> anyhow::Result<()> {
let json = matches!(invocation.format, OutputFormat::Json);
let adapters = resolve_sync_adapters(
loaded,
invocation.adapter.as_deref(),
invocation.path.clone(),
)?;
if adapters.is_empty() {
output_err(&format!(
"{} no enabled adapters - run `pond adapters discover` (or `pond init`) to enable some, or add `[adapters.<name>]` blocks to {}",
pond::output::paint("plan:", pond::output::dim()),
config_file.display(),
))?;
if json {
output(&serde_json::to_string_pretty(&serde_json::json!({
"dry_run": true,
"adapters": [],
}))?)?;
}
return Ok(());
}
let (_, store) = open_store(storage_path, loaded, true, false).await?;
let noop = pond::adapter::NoopOracle;
let rowmap_oracle;
let oracle: &dyn pond::adapter::SkipOracle = if invocation.verify {
&noop
} else {
ensure_rowmap_with_spinner(&store).await;
rowmap_oracle = pond::sessions::RowmapOracle(store.rowmap_snapshot());
&rowmap_oracle
};
let mut rows: Vec<(String, usize, Option<pond::adapter::SyncPlan>)> = Vec::new();
for (name, blob) in adapters {
let factory = adapter::by_name(&name).ok_or_else(|| {
anyhow::anyhow!(
"unknown adapter {name:?}; known: {}",
adapter::known_names().join(", "),
)
})?;
let opened = factory.open(blob)?;
let plan = opened.plan(oracle).await?;
let sources = match &plan {
Some(plan) => plan.sources,
None => opened.discover().await?,
};
rows.push((name, sources, plan));
}
if json {
let adapters: Vec<serde_json::Value> = rows
.iter()
.map(|(name, sources, plan)| {
serde_json::json!({
"name": name,
"sources": sources,
"fresh": plan.map(|plan| plan.fresh),
"pending": plan.map(|plan| plan.pending),
})
})
.collect();
output(&serde_json::to_string_pretty(&serde_json::json!({
"dry_run": true,
"adapters": adapters,
}))?)?;
return Ok(());
}
let label = pond::output::paint("plan", pond::output::dim());
for (name, sources, plan) in &rows {
let detail = match plan {
Some(plan) if plan.pending == 0 => {
format!("{} sources - up to date", format_thousands(*sources as u64))
}
Some(plan) => format!(
"{} sources - {} to sync, {} fresh",
format_thousands(*sources as u64),
format_thousands(plan.pending as u64),
format_thousands(plan.fresh as u64),
),
None => format!(
"{} sources (pending unknown - this adapter has no cheap freshness preview)",
format_thousands(*sources as u64),
),
};
output(&format!("{label} {name:<12} {detail}"))?;
}
output(&pond::output::paint(
"dry run - nothing written to the store",
pond::output::dim(),
))?;
Ok(())
}
fn emit_json_error(error: &anyhow::Error) -> anyhow::Result<()> {
output(&serde_json::to_string_pretty(&serde_json::json!({
"outcome": "error",
"error": format!("{error:#}"),
}))?)
}
fn describe_lock_holder(holder: Option<&syncstate::SyncLockHolder>) -> String {
match holder {
Some(holder) => format!("pid {}, running for {}", holder.pid, ago(holder.started_at)),
None => "holder unknown".to_owned(),
}
}
async fn wait_for_sync_lock(
store_key: &str,
mut holder: Option<syncstate::SyncLockHolder>,
) -> anyhow::Result<syncstate::SyncLockGuard> {
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template("{spinner:.green} {wide_msg}")
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
spinner.enable_steady_tick(Duration::from_millis(120));
loop {
spinner.set_message(format!(
"another pond sync is running ({}); waiting for it to finish - Ctrl-C to stop waiting, --no-wait to skip instead",
describe_lock_holder(holder.as_ref()),
));
match syncstate::try_acquire_sync_lock(store_key) {
Ok(syncstate::SyncLockState::Acquired(guard)) => {
spinner.finish_and_clear();
return Ok(guard);
}
Ok(syncstate::SyncLockState::Busy(current)) => holder = current,
Err(error) => {
spinner.finish_and_clear();
return Err(error);
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn ensure_rowmap_with_spinner(store: &Store) {
let started = std::time::Instant::now();
let spinner = ProgressBar::new_spinner();
spinner.set_style(
ProgressStyle::with_template(
"{spinner:.green} checking what this store already holds (one-time scan on a new host)... [{elapsed_precise}]",
)
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
spinner.enable_steady_tick(Duration::from_millis(120));
if let Err(error) = store.ensure_rowmap(&default_cache_dir()).await {
tracing::warn!(%error, "rowmap build for sync oracle skipped; re-reading all sources");
}
spinner.finish_and_clear();
tracing::debug!(target: "pond::perf", stage = "ensure_rowmap", elapsed_ms = started.elapsed().as_millis() as u64, "sync stage");
}
#[derive(Default)]
struct FlushHud {
inner: std::sync::Mutex<Option<FlushHudSlot>>,
}
struct FlushHudSlot {
adapter: String,
bar: ProgressBar,
started: std::time::Instant,
pending_sessions: usize,
stderr_tty: bool,
last_heartbeat: std::time::Instant,
}
impl FlushHud {
fn attach(&self, adapter: &str, bar: &ProgressBar, started: std::time::Instant, tty: bool) {
if let Ok(mut slot) = self.inner.lock() {
*slot = Some(FlushHudSlot {
adapter: adapter.to_owned(),
bar: bar.clone(),
started,
pending_sessions: 0,
stderr_tty: tty,
last_heartbeat: std::time::Instant::now(),
});
}
}
fn set_pending(&self, pending: usize) {
if let Ok(mut slot) = self.inner.lock()
&& let Some(slot) = slot.as_mut()
{
slot.pending_sessions = pending;
}
}
fn detach(&self) {
if let Ok(mut slot) = self.inner.lock() {
*slot = None;
}
}
fn heartbeat(&self, line: &str) {
if let Ok(mut slot) = self.inner.lock()
&& let Some(slot) = slot.as_mut()
{
heartbeat_line(slot.stderr_tty, &mut slot.last_heartbeat, line);
}
}
fn embed_tick(&self, done: usize, total: usize) {
let Ok(mut slot) = self.inner.lock() else {
return;
};
let Some(slot) = slot.as_mut() else { return };
let line = sync_status_line(
&slot.adapter,
slot.bar.position(),
slot.bar.length().unwrap_or(0),
slot.started.elapsed(),
Some(slot.bar.eta()),
&format!(
"committing {} sessions: embedding {}/{} messages",
format_thousands(slot.pending_sessions as u64),
format_thousands(done as u64),
format_thousands(total as u64),
),
);
heartbeat_line(slot.stderr_tty, &mut slot.last_heartbeat, &line);
slot.bar.set_message(line);
}
}
fn heartbeat_line(stderr_tty: bool, last_heartbeat: &mut std::time::Instant, line: &str) {
if !stderr_tty && last_heartbeat.elapsed() >= SYNC_HEARTBEAT_EVERY {
let _ = output_err(line);
*last_heartbeat = std::time::Instant::now();
}
}
fn ago(then: DateTime<Utc>) -> String {
brief_duration(
Utc::now()
.signed_duration_since(then)
.to_std()
.unwrap_or_default(),
)
}
fn brief_duration(duration: Duration) -> String {
let secs = duration.as_secs();
if secs < 60 {
format!("{secs}s")
} else if secs < 3_600 {
format!("{}m", secs / 60)
} else if secs < 86_400 {
let hours = secs / 3_600;
let minutes = (secs % 3_600) / 60;
if minutes == 0 {
format!("{hours}h")
} else {
format!("{hours}h {minutes}m")
}
} else {
format!("{}d", secs / 86_400)
}
}
async fn run_import_stage(
store: &Store,
loaded: &Config,
config_file: &Path,
adapter: Option<String>,
path: Option<PathBuf>,
verify: bool,
flush_hud: &Arc<FlushHud>,
) -> anyhow::Result<IngestSummary> {
let adapters = resolve_sync_adapters(loaded, adapter.as_deref(), path)?;
if adapters.is_empty() {
let disabled = loaded.disabled_adapter_names();
let label = pond::output::paint("import:", pond::output::dim());
if disabled.is_empty() {
output_err(&format!(
"{label} no adapters configured. Run `pond adapters discover` (or `pond init`) to detect and enable adapters, or add `[adapters.<name>]` blocks to {}.",
config_file.display(),
))?;
} else {
output_err(&format!(
"{label} no enabled adapters. Found {} disabled: {}. Enable one with `pond adapters enable <name>` (or add `enabled = true` to its section in {}).",
disabled.len(),
disabled.join(", "),
config_file.display(),
))?;
}
return Ok(IngestSummary::default());
}
let noop = pond::adapter::NoopOracle;
let rowmap_oracle;
let oracle: &dyn pond::adapter::SkipOracle = if verify {
output_err(&pond::output::paint(
"import: --verify: re-reading every source body, bypassing the freshness skip",
pond::output::yellow(),
))?;
&noop
} else {
ensure_rowmap_with_spinner(store).await;
rowmap_oracle = pond::sessions::RowmapOracle(store.rowmap_snapshot());
&rowmap_oracle
};
if !verify && oracle.is_empty() {
output_err(&pond::output::paint(
"plan: first sync from this host - every source is read and embedded in full, which can take a while on a large history. Ctrl-C is safe: the next sync resumes where this one stopped.",
pond::output::yellow(),
))?;
}
let mp = indicatif::MultiProgress::with_draw_target(
indicatif::ProgressDrawTarget::stderr_with_hz(8),
);
let mut total = IngestSummary::default();
for (name, blob) in adapters {
let summary = sync_with_progress(store, &mp, &name, blob, oracle, flush_hud).await?;
total.merge(&summary);
}
Ok(total)
}
async fn guard_embedding_model_unchanged(store: &Store) -> anyhow::Result<()> {
if store.embedding_model_swapped().await? {
bail!(
"messages embedded under a different model id; run `pond optimize --force-embed` \
to re-embed under the configured model {:?}",
pond::embed::model_id(),
);
}
Ok(())
}
async fn run_embed_stage(store: &Store, force: bool) -> anyhow::Result<EmbedSummary> {
run_embed_stage_with_limit(store, force, None, "--force-embed").await
}
async fn run_embed_stage_with_limit(
store: &Store,
force: bool,
limit: Option<usize>,
force_hint: &'static str,
) -> anyhow::Result<EmbedSummary> {
let swapped = store.embedding_model_swapped().await?;
if swapped {
if !force {
bail!(
"messages embedded under a different model id; pass `{force_hint}` \
to re-embed (the vector index will be rebuilt under the configured \
model {:?})",
pond::embed::model_id(),
);
}
output(&pond::output::paint(
"embed: --force-embed: re-embedding stale-model rows after dropping IVF_SQ",
pond::output::yellow(),
))?;
store.drop_vector_index().await?;
}
let backlog = if swapped {
store.embed_backlog_count().await?
} else {
store.unindexed_vector_backlog().await?
};
let bar_total = match limit {
Some(cap) => backlog.min(cap),
None => backlog,
};
if bar_total == 0 && !swapped {
return Ok(EmbedSummary::default());
}
let embedder = CandleEmbedder::load()?;
let device = embedder.device().to_owned();
let bar = ProgressBar::with_draw_target(
Some(bar_total as u64),
indicatif::ProgressDrawTarget::stderr_with_hz(8),
);
bar.set_style(
ProgressStyle::with_template("{wide_msg}").unwrap_or_else(|_| ProgressStyle::default_bar()),
);
bar.enable_steady_tick(Duration::from_millis(120));
let cancel = Arc::new(std::sync::atomic::AtomicBool::new(false));
{
let cancel = cancel.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
cancel.store(true, std::sync::atomic::Ordering::Relaxed);
eprintln!("\ninterrupted; flushing window (Ctrl-C again to abort)...");
let _ = tokio::signal::ctrl_c().await;
std::process::exit(130);
});
}
let started = std::time::Instant::now();
let bar_for_callback = bar.clone();
let mut worker = EmbedWorker::new(store, &embedder)
.with_cancel(cancel)
.with_progress(move |progress: BatchProgress| {
let pos = progress.total_messages as u64;
let len = bar_for_callback.length().unwrap_or(0);
let secs = started.elapsed().as_secs_f64().max(0.001);
let rate = progress.total_messages as f64 / secs;
bar_for_callback.set_position(pos);
bar_for_callback.set_message(format!(
"semantic embedding [{}] [{}] {pos}/{len} messages {rate:.0} msgs/s",
elapsed_hms(started.elapsed()),
progress_glyphs(pos, len, 24),
));
});
if force {
worker = worker.include_stale();
}
if let Some(limit) = limit {
worker = worker.with_limit(limit);
}
let summary = worker.run().await?;
bar.finish_and_clear();
if summary.messages > 0 {
let label = if summary.cancelled {
"semantic embedding (interrupted)"
} else {
"semantic embedding"
};
output(&format!(
"{} +{} messages ({})",
pond::output::paint(label, pond::output::dim()),
format_thousands(summary.messages as u64),
device,
))?;
}
Ok(summary)
}
fn configured_maintenance_policy(
config: &Config,
cleanup_override: Option<chrono::Duration>,
) -> anyhow::Result<MaintenancePolicy> {
let compaction_fragment_cap = config
.maintenance
.compaction_fragment_cap
.unwrap_or(pond::substrate::DEFAULT_COMPACTION_FRAGMENT_CAP);
let configured_cleanup = config
.maintenance
.cleanup_older_than
.as_deref()
.map(parse_retention)
.transpose()
.map_err(|err| anyhow::anyhow!("invalid [maintenance].cleanup_older_than: {err}"))?;
let cleanup_older_than = cleanup_override
.or(configured_cleanup)
.unwrap_or_else(default_cleanup_older_than);
if cleanup_older_than < chrono::Duration::hours(1) {
anyhow::bail!(
"cleanup retention below the 1h floor; set [maintenance].cleanup_older_than \
(or --cleanup-older-than) to 1h or longer"
);
}
Ok(MaintenancePolicy {
compaction_fragment_cap,
cleanup_older_than,
cleanup_interval: 1,
scalar_fold_row_threshold: 0,
index_fold_row_threshold: 0,
})
}
async fn run_update_indexes_stage(
store: &Store,
policy: &MaintenancePolicy,
) -> anyhow::Result<OptimizeOutcome> {
let (progress, bar) = optimize_progress_bar();
let outcome = store.optimize_indices(Some(progress), policy).await?;
bar.finish_and_clear();
render_optimize_hints(&outcome)?;
Ok(outcome)
}
async fn finalize_indexes(
store: &Store,
policy: &MaintenancePolicy,
force_embed: bool,
) -> anyhow::Result<OptimizeOutcome> {
let embed_started = std::time::Instant::now();
run_embed_stage(store, force_embed).await?;
tracing::debug!(target: "pond::perf", stage = "embed", elapsed_ms = embed_started.elapsed().as_millis() as u64, "sync stage");
let optimize_started = std::time::Instant::now();
let outcome = run_update_indexes_stage(store, policy).await;
tracing::debug!(target: "pond::perf", stage = "optimize", elapsed_ms = optimize_started.elapsed().as_millis() as u64, "sync stage");
outcome
}
async fn render_sync_summary(store: &Store) -> anyhow::Result<()> {
use pond::output::{dim, paint};
let (index_status, (stored_sessions, stored_messages, _)) =
tokio::try_join!(store.index_status(), store.row_counts())?;
let health = classify_index_health(
&index_status,
None,
pond::substrate::DEFAULT_SYNC_INDEX_FOLD_ROWS as u64,
);
output("")?;
output(&render_indexes_line(&health))?;
output(&format!(
"{} {} sessions, {} messages",
paint("stored", dim()),
format_thousands(stored_sessions as u64),
format_thousands(stored_messages as u64),
))?;
Ok(())
}
async fn export_pond_archive(store: &Store, path: &Path) -> anyhow::Result<LanceArchiveExport> {
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
let temp = tempfile::Builder::new()
.prefix("pond-export-")
.tempdir()
.context("failed to create export staging dir")?;
let data_dir = temp.path().join("data");
let summary = store.export_clean_lance_datasets(&data_dir).await?;
let manifest = PondArchiveManifest {
archive_version: 1,
pond_version: env!("CARGO_PKG_VERSION").to_owned(),
protocol_version: PROTOCOL_VERSION,
created_at: Utc::now(),
rows: summary.rows,
source_versions: summary.source_versions,
embedding_model: pond::embed::model_id().to_owned(),
embedding_dim: pond::sessions::embedding_dim(),
};
let manifest_json =
serde_json::to_vec_pretty(&manifest).context("failed to serialize archive manifest")?;
fs::write(temp.path().join("manifest.json"), manifest_json)
.context("failed to write archive manifest")?;
zip_directory(temp.path(), path)?;
Ok(summary)
}
async fn import_pond_archive(store: &Store, path: &Path) -> anyhow::Result<LanceArchiveImport> {
let temp = tempfile::Builder::new()
.prefix("pond-import-")
.tempdir()
.context("failed to create import staging dir")?;
unzip_archive(path, temp.path())?;
let manifest_path = temp.path().join("manifest.json");
let manifest_bytes = fs::read(&manifest_path)
.with_context(|| format!("failed to read {}", manifest_path.display()))?;
let manifest: PondArchiveManifest =
serde_json::from_slice(&manifest_bytes).context("failed to parse archive manifest")?;
if manifest.archive_version != 1 {
bail!(
"unsupported .pond archive version {}; supported: 1",
manifest.archive_version
);
}
if manifest.protocol_version != PROTOCOL_VERSION {
bail!(
"unsupported .pond protocol version {}; supported: {}",
manifest.protocol_version,
PROTOCOL_VERSION
);
}
if manifest.embedding_dim != pond::sessions::embedding_dim() {
bail!(
"archive embedding_dim={} does not match configured embedding_dim={}",
manifest.embedding_dim,
pond::sessions::embedding_dim()
);
}
store
.import_clean_lance_datasets(&temp.path().join("data"))
.await
}
fn zip_directory(source: &Path, dest: &Path) -> anyhow::Result<()> {
let file =
File::create(dest).with_context(|| format!("failed to create {}", dest.display()))?;
let mut zip = zip::ZipWriter::new(file);
let file_options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Deflated)
.unix_permissions(0o644);
let dir_options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
.unix_permissions(0o755);
for entry in walkdir::WalkDir::new(source).sort_by_file_name() {
let entry = entry.context("failed to walk archive staging dir")?;
let path = entry.path();
let rel = path
.strip_prefix(source)
.context("failed to compute archive relative path")?;
if rel.as_os_str().is_empty() {
continue;
}
let name = rel
.to_string_lossy()
.replace(std::path::MAIN_SEPARATOR, "/");
if entry.file_type().is_dir() {
zip.add_directory(format!("{name}/"), dir_options)
.with_context(|| format!("failed to add archive directory {name}"))?;
continue;
}
zip.start_file(&name, file_options)
.with_context(|| format!("failed to add archive file {name}"))?;
let mut input =
File::open(path).with_context(|| format!("failed to open {}", path.display()))?;
io::copy(&mut input, &mut zip)
.with_context(|| format!("failed to write archive file {name}"))?;
}
zip.finish().context("failed to finalize archive")?;
Ok(())
}
fn unzip_archive(source: &Path, dest: &Path) -> anyhow::Result<()> {
let file = File::open(source)
.with_context(|| format!("failed to open archive {}", source.display()))?;
let mut archive = zip::ZipArchive::new(file).context("failed to read .pond archive")?;
for i in 0..archive.len() {
let mut entry = archive
.by_index(i)
.context("failed to read archive entry")?;
let Some(enclosed) = entry.enclosed_name() else {
bail!("archive contains unsafe path {}", entry.name());
};
let outpath = dest.join(enclosed);
if entry.is_dir() {
fs::create_dir_all(&outpath)
.with_context(|| format!("failed to create {}", outpath.display()))?;
continue;
}
if let Some(parent) = outpath.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
let mut output = File::create(&outpath)
.with_context(|| format!("failed to create {}", outpath.display()))?;
io::copy(&mut entry, &mut output)
.with_context(|| format!("failed to extract {}", outpath.display()))?;
}
Ok(())
}
fn resolve_sync_adapters(
config: &Config,
name: Option<&str>,
path: Option<PathBuf>,
) -> anyhow::Result<Vec<(String, Value)>> {
if let Some(path) = path {
let name = name.ok_or_else(|| {
anyhow::anyhow!("--path requires an explicit <adapter> positional argument")
})?;
let known = adapter::known_names();
if !known.contains(&name) {
bail!("unknown adapter {name:?}; known: {}", known.join(", "));
}
return Ok(vec![(name.to_owned(), json!({ "path": path }))]);
}
if let Some(name) = name {
let known = adapter::known_names();
if !known.contains(&name) {
bail!("unknown adapter {name:?}; known: {}", known.join(", "));
}
if config.adapters.contains_key(name) {
return config.resolve_adapters(Some(name));
}
bail!(
"adapter {name:?} has no [adapters.{name}] entry; enable it with `pond adapters enable {name}` (or `pond init`), then re-run `pond sync`"
);
}
if !config.adapters.is_empty() {
return config.resolve_adapters(None);
}
bail!(
"no adapters configured; run `pond adapters discover` (or `pond init`) to detect and enable adapters, then re-run `pond sync`"
);
}
const SYNC_HEARTBEAT_EVERY: Duration = Duration::from_secs(30);
async fn sync_with_progress(
store: &Store,
mp: &indicatif::MultiProgress,
name: &str,
config: Value,
oracle: &dyn pond::adapter::SkipOracle,
flush_hud: &FlushHud,
) -> anyhow::Result<IngestSummary> {
let factory = adapter::by_name(name).ok_or_else(|| {
anyhow::anyhow!(
"unknown adapter {name:?}; known: {}",
adapter::known_names().join(", "),
)
})?;
let adapter = factory.open(config)?;
let bar = mp.add(ProgressBar::new(0));
bar.set_style(
ProgressStyle::with_template("{wide_msg}").unwrap_or_else(|_| ProgressStyle::default_bar()),
);
bar.set_message(sync_status_line(
name,
0,
0,
Duration::ZERO,
None,
"starting...",
));
bar.enable_steady_tick(Duration::from_millis(250));
let mut messages: u64 = 0;
let mut errors: u64 = 0;
let mut drops: u64 = 0;
let started = std::time::Instant::now();
let bar_ref = &bar;
let stderr_tty = io::stderr().is_terminal();
flush_hud.attach(name, &bar, started, stderr_tty);
let paint_line = |bar: &ProgressBar, line: String| {
flush_hud.heartbeat(&line);
bar.set_message(line);
};
let summary = handlers::ingest_adapter(store, adapter.as_ref(), oracle, |event| match event {
SyncEvent::Discovered { total } => {
if let Some(total) = total {
bar_ref.set_length(total as u64);
}
}
SyncEvent::SessionDone(outcome) => {
let dropped_count: usize;
let optional_reason: Option<String>;
let status_label: &str;
match &outcome.status {
SyncStatus::Ok => {
status_label = "ok";
dropped_count = 0;
optional_reason = None;
}
SyncStatus::Partial {
dropped_events,
first_drop_reason,
} => {
drops += *dropped_events as u64;
status_label = "partial";
dropped_count = *dropped_events;
optional_reason = Some(match first_drop_reason {
Some(reason) => {
format!("dropped {dropped_events} event(s) mid-session: {reason}")
}
None => format!("dropped {dropped_events} event(s) mid-session"),
});
}
SyncStatus::Skipped { reason } => {
errors += 1;
status_label = "skipped";
dropped_count = 0;
optional_reason = Some(reason.clone());
}
SyncStatus::Rejected { reason } => {
errors += 1;
status_label = "rejected";
dropped_count = 0;
optional_reason = Some(reason.clone());
}
SyncStatus::Fresh => {
status_label = "fresh";
dropped_count = 0;
optional_reason = None;
}
SyncStatus::Empty => {
let len = bar_ref.length().unwrap_or(0);
bar_ref.set_length(len.saturating_sub(1));
status_label = "empty";
dropped_count = 0;
optional_reason = None;
}
}
messages += outcome.messages as u64;
if !matches!(
outcome.status,
SyncStatus::Ok | SyncStatus::Fresh | SyncStatus::Empty
) {
let _ = mp.println(format_sync_line(name, &outcome, optional_reason.as_deref()));
}
match optional_reason.as_deref() {
None => tracing::info!(
target: "pond::sync",
adapter = name,
status = status_label,
project = outcome.project.as_deref().unwrap_or("-"),
session = outcome.session_id.as_deref().unwrap_or("-"),
messages = outcome.messages,
dropped = dropped_count,
"session done"
),
Some(reason) => tracing::info!(
target: "pond::sync",
adapter = name,
status = status_label,
project = outcome.project.as_deref().unwrap_or("-"),
session = outcome.session_id.as_deref().unwrap_or("-"),
messages = outcome.messages,
dropped = dropped_count,
%reason,
"session done"
),
}
if !matches!(outcome.status, SyncStatus::Empty) {
bar_ref.inc(1);
}
let tail = format_bar_message(messages, drops, errors, started.elapsed());
let line = sync_status_line(
name,
bar_ref.position(),
bar_ref.length().unwrap_or(0),
started.elapsed(),
Some(bar_ref.eta()),
&tail,
);
paint_line(bar_ref, line);
}
SyncEvent::SkippedBulk { status, count } => {
match status {
SyncStatus::Empty => {
let len = bar_ref.length().unwrap_or(0);
bar_ref.set_length(len.saturating_sub(count as u64));
}
_ => bar_ref.inc(count as u64),
}
let tail = format_bar_message(messages, drops, errors, started.elapsed());
let line = sync_status_line(
name,
bar_ref.position(),
bar_ref.length().unwrap_or(0),
started.elapsed(),
Some(bar_ref.eta()),
&tail,
);
paint_line(bar_ref, line);
}
SyncEvent::Flushing { pending } => {
flush_hud.set_pending(pending);
let line = sync_status_line(
name,
bar_ref.position(),
bar_ref.length().unwrap_or(0),
started.elapsed(),
Some(bar_ref.eta()),
&format!(
"committing {} sessions (embedding + writing)...",
format_thousands(pending as u64)
),
);
paint_line(bar_ref, line);
}
})
.await?;
flush_hud.detach();
let tail = format_sync_outcome(&summary, drops, errors, started.elapsed());
let final_line = sync_status_line(
name,
bar.position(),
bar.length().unwrap_or(0),
started.elapsed(),
None,
&tail,
);
if !stderr_tty {
let _ = output_err(&final_line);
}
bar.finish_with_message(final_line);
Ok(summary)
}
fn format_sync_outcome(
summary: &IngestSummary,
drops: u64,
errors: u64,
elapsed: Duration,
) -> String {
let new_sessions = summary.sessions_inserted as u64;
let new_messages = summary.messages_inserted_searchable as u64;
let mut tail = if new_sessions == 0 && new_messages == 0 {
"up to date".to_owned()
} else {
format!(
"+{} sessions (+{} searchable messages) in {} {:.0} msgs/s",
format_thousands(new_sessions),
format_thousands(new_messages),
elapsed_hms(elapsed),
new_messages as f64 / elapsed.as_secs_f64().max(0.001),
)
};
if drops > 0 {
tail.push_str(&format!(" {} dropped", format_thousands(drops)));
}
if errors > 0 {
tail.push_str(&format!(" {} err", format_thousands(errors)));
}
tail
}
fn format_sync_line(adapter: &str, outcome: &SessionOutcome, reason: Option<&str>) -> String {
use pond::output::{dim, green, paint, red, yellow};
let (raw_tag, tag_style) = match &outcome.status {
SyncStatus::Ok => ("ok ", green()),
SyncStatus::Partial { .. } => ("part", yellow()),
SyncStatus::Skipped { .. } => ("skip", red()),
SyncStatus::Rejected { .. } => ("rej ", red()),
SyncStatus::Fresh => ("fresh", green()),
SyncStatus::Empty => ("empty", dim()),
};
let tag = paint(raw_tag, tag_style);
if matches!(outcome.status, SyncStatus::Fresh) {
let ts = chrono::Local::now().format("%H:%M:%S");
let session = outcome.session_id.as_deref().unwrap_or("-");
return format!("[{ts}] {adapter} {tag} session={session} (cached)");
}
let ts = chrono::Local::now().format("%H:%M:%S");
let project = outcome.project.as_deref().unwrap_or("-");
let session = outcome.session_id.as_deref().unwrap_or("-");
match reason {
None => format!(
"[{ts}] {adapter} {tag} project={project} session={session} msgs={}",
outcome.messages,
),
Some(reason) => format!("[{ts}] {adapter} {tag} {reason}"),
}
}
fn format_bar_message(messages: u64, drops: u64, errors: u64, elapsed: Duration) -> String {
let secs = elapsed.as_secs_f64().max(0.001);
let msg_per_sec = (messages as f64) / secs;
let mut out = format!(
"{} msgs {:.0} msgs/s",
format_thousands(messages),
msg_per_sec,
);
if drops > 0 {
out.push_str(&format!(" {} dropped", format_thousands(drops)));
}
if errors > 0 {
out.push_str(&format!(" {} err", format_thousands(errors)));
}
out
}
fn elapsed_hms(elapsed: Duration) -> String {
let secs = elapsed.as_secs();
format!(
"{:02}:{:02}:{:02}",
secs / 3600,
(secs % 3600) / 60,
secs % 60
)
}
fn stage_line(elapsed: Duration, label: &str, detail: &str) -> String {
let ts = pond::output::paint(&format!("[{}]", elapsed_hms(elapsed)), pond::output::dim());
format!("{ts} {label:8}{detail}")
}
fn progress_glyphs(pos: u64, len: u64, width: usize) -> String {
let filled = if len == 0 {
0
} else {
((u128::from(pos.min(len)) * width as u128) / u128::from(len)) as usize
};
(0..width)
.map(|cell| if cell < filled { '#' } else { '-' })
.collect()
}
fn sync_status_line(
name: &str,
pos: u64,
len: u64,
elapsed: Duration,
eta: Option<Duration>,
tail: &str,
) -> String {
let eta = match eta {
Some(estimate) if pos > 0 && len > pos && elapsed.as_secs() >= 5 => {
format!(" eta {}", brief_duration(estimate))
}
_ => String::new(),
};
format!(
"sync {name:<12} [{}] [{}] {pos}/{len} sessions{eta} {tail}",
elapsed_hms(elapsed),
progress_glyphs(pos, len, 12),
)
}
fn format_thousands(value: u64) -> String {
let raw = value.to_string();
let mut out = String::with_capacity(raw.len() + raw.len() / 3);
for (idx, ch) in raw.chars().rev().enumerate() {
if idx > 0 && idx % 3 == 0 {
out.push(',');
}
out.push(ch);
}
out.chars().rev().collect()
}
fn format_bytes(bytes: u64) -> String {
const UNITS: [&str; 6] = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"];
if bytes < 1024 {
return format!("{bytes} B");
}
let mut value = bytes as f64;
let mut unit = 0;
while value >= 1024.0 && unit < UNITS.len() - 1 {
value /= 1024.0;
unit += 1;
}
if value >= 100.0 {
format!("{:.0} {}", value, UNITS[unit])
} else if value >= 10.0 {
format!("{:.1} {}", value, UNITS[unit])
} else {
format!("{:.2} {}", value, UNITS[unit])
}
}
async fn explain_search(
store: &Store,
embedder: &LazyEmbedder,
request: &SearchRequest,
search: &config::SearchConfig,
) -> anyhow::Result<String> {
handlers::explain_search_plan(store, embedder, request.clone(), search)
.await
.map_err(|envelope| anyhow::anyhow!("{envelope:?}"))
}
fn optimize_progress_bar() -> (OptimizeProgressFn, ProgressBar) {
let bar = ProgressBar::new_spinner();
bar.set_style(
ProgressStyle::with_template("{spinner:.green} {elapsed_precise} {wide_msg}")
.unwrap_or_else(|_| ProgressStyle::default_spinner()),
);
bar.enable_steady_tick(Duration::from_millis(120));
let bar_for_callback = bar.clone();
let callback: OptimizeProgressFn = Arc::new(move |event| match event {
OptimizeEvent::PhaseStart {
table,
phase,
detail,
} => {
let label = match detail {
Some(d) => format!("{} {} ({d})", table.as_str(), phase.label()),
None => format!("{} {}", table.as_str(), phase.label()),
};
bar_for_callback.set_message(label);
}
OptimizeEvent::PhaseDone {
table,
phase,
elapsed_ms,
} => {
tracing::debug!(
target: "pond::perf",
table = table.as_str(),
phase = phase.label(),
elapsed_ms,
"index phase done"
);
}
OptimizeEvent::IndexStage {
table,
index,
stage,
completed,
total,
unit,
} => {
let progress = match total {
Some(t) if t > 0 => format!("{completed}/{t} {unit}"),
_ => format!("{completed} {unit}"),
};
bar_for_callback.set_message(format!(
"{} {} ({}): {} {}",
table.as_str(),
"index-build",
index,
stage,
progress.trim_end(),
));
}
});
(callback, bar)
}
fn render_optimize_hints(outcome: &OptimizeOutcome) -> anyhow::Result<()> {
use pond::output::{dim, paint, red, yellow};
for entry in &outcome.tables {
if matches!(entry.compaction, PhaseOutcome::SkippedConflict) {
output(&format!(
"{} compaction on {} deferred: concurrent writer; rerun once it finishes",
paint("hint", dim()),
entry.table.as_str(),
))?;
}
}
for entry in &outcome.tables {
if let PhaseOutcome::Failed(error) = &entry.indices {
output(&paint(
&format!("error indices on {}: {error:#}", entry.table.as_str()),
red(),
))?;
if pond::substrate::is_index_error(error) {
output(&format!(
"{} structural index fault; recover with `pond optimize --rebuild`",
paint("hint", dim()),
))?;
}
}
if let PhaseOutcome::Failed(error) = &entry.compaction {
output(&paint(
&format!("error compaction on {}: {error:#}", entry.table.as_str()),
yellow(),
))?;
}
}
Ok(())
}
fn index_status_label(status: &IndexStatus, fold_threshold: u64) -> &'static str {
if !status.exists {
"not_built"
} else if (status.unindexed_rows as u64) < fold_threshold.max(1) {
"ready"
} else {
"pending"
}
}
fn status_json_empty(resolved: &ResolvedStorage) -> anyhow::Result<String> {
let doc = serde_json::json!({
"pond_version": VERSION.as_str(),
"storage": {
"url": resolved.display(),
"binding": resolved.binding.describe(),
},
"local": serde_json::Value::Null,
"hosts": serde_json::Value::Null,
"initialized": false,
});
serde_json::to_string_pretty(&doc).context("serialize status as JSON")
}
fn status_json(
resolved: &ResolvedStorage,
sizes: &TableSizes,
checks: &StatusChecks,
local: &LocalStatus,
hosts: Option<&[pond::sessions::HostActivity]>,
) -> anyhow::Result<String> {
let total_bytes = sizes.sessions + sizes.messages + sizes.parts + sizes.other;
let totals = checks.totals;
let indexes: Vec<serde_json::Value> = checks
.index_status
.iter()
.map(|status| {
serde_json::json!({
"name": format!("{}.{}", status.table.as_str(), status.intent_name),
"status": index_status_label(
status,
pond::substrate::DEFAULT_SYNC_INDEX_FOLD_ROWS as u64,
),
})
})
.collect();
let embedding_doc = checks.embedding.as_ref().map(|e| {
serde_json::json!({
"embedded": e.embedded,
"eligible": e.total,
"backlog": e.backlog,
})
});
let local_adapters: Vec<serde_json::Value> = local
.adapters
.iter()
.map(|adapter| {
serde_json::json!({
"name": adapter.name,
"sources": adapter.sources,
"fresh": adapter.plan.map(|plan| plan.fresh),
"pending": adapter.plan.map(|plan| plan.pending),
})
})
.collect();
let hosts_doc = hosts.map(|hosts| {
hosts
.iter()
.map(|host| {
serde_json::json!({
"hostname": host.hostname,
"sessions": host.sessions,
"last_activity_at": host.last_message_at.to_rfc3339(),
})
})
.collect::<Vec<_>>()
});
let doc = serde_json::json!({
"pond_version": VERSION.as_str(),
"storage": {
"url": resolved.display(),
"binding": resolved.binding.describe(),
"total_bytes": total_bytes,
"tables": {
"sessions": { "bytes": sizes.sessions, "rows": totals.sessions },
"messages": { "bytes": sizes.messages, "rows": totals.messages },
"parts": { "bytes": sizes.parts, "rows": totals.parts },
},
},
"corpus": {
"sessions": totals.sessions,
"messages": totals.messages,
"parts": totals.parts,
},
"indexes": indexes,
"embedding": embedding_doc,
"source_agents": checks.adapter_count,
"local": {
"host": local.hostname,
"adapters": local_adapters,
"pending_known": local.pending_known,
"last_sync": local
.last_sync
.as_ref()
.map(serde_json::to_value)
.transpose()
.context("serialize last-sync record")?,
"next_scheduled_run_secs": local.next_run_secs,
},
"hosts": hosts_doc,
"schedule": {
"active": local.schedule.active,
"backend": local.schedule.backend,
"every": local.schedule.every.map(|every| every.label()),
},
"initialized": true,
});
serde_json::to_string_pretty(&doc).context("serialize status as JSON")
}
fn render_status_storage_line(title: &str, resolved: &ResolvedStorage) -> anyhow::Result<()> {
use pond::output::{bold, dim, paint};
output(&paint(title, bold()))?;
output(&format!(
"{} {} {}",
paint("storage", dim()),
resolved.display(),
paint(&format!("[{}]", resolved.binding.describe()), dim()),
))?;
Ok(())
}
fn render_empty_status(
title: &str,
resolved: &ResolvedStorage,
has_adapters: bool,
) -> anyhow::Result<()> {
use pond::output::{dim, paint};
render_status_storage_line(title, resolved)?;
let fix = if has_adapters {
"run `pond sync` to import sessions"
} else {
"run `pond init` to set up adapters and import sessions"
};
output(&format!(
"{} no data yet - {fix}",
paint("stored", dim())
))?;
Ok(())
}
fn render_status_header(
title: &str,
resolved: &ResolvedStorage,
sizes: &TableSizes,
totals: &RowTotals,
) -> anyhow::Result<()> {
render_status_storage_line(title, resolved)?;
let mut table = new_table();
let total_bytes = sizes.sessions + sizes.messages + sizes.parts + sizes.other;
let rows = [
(
"sessions",
sizes.sessions,
Some(totals.sessions),
sizes.sessions_data,
),
(
"messages",
sizes.messages,
Some(totals.messages),
sizes.messages_data,
),
("parts", sizes.parts, Some(totals.parts), sizes.parts_data),
("other", sizes.other, None, Default::default()),
];
for (label, bytes, rows_opt, data) in rows {
let dead_note = data
.dead()
.filter(|dead| *dead > 64 * 1024 * 1024 && *dead * 10 > data.on_disk)
.map(|dead| format!("{} pending cleanup", format_bytes(dead)))
.unwrap_or_default();
table.add_row(vec![
Cell::new(format!(" {label}")),
Cell::new(format_bytes(bytes)).set_alignment(CellAlignment::Right),
Cell::new(
rows_opt
.map(|n| format!("{} rows", format_thousands(n)))
.unwrap_or_default(),
)
.set_alignment(CellAlignment::Right),
Cell::new(dead_note).set_alignment(CellAlignment::Right),
]);
}
table.add_row(vec![
Cell::new(" total").add_attribute(Attribute::Bold),
Cell::new(format_bytes(total_bytes))
.set_alignment(CellAlignment::Right)
.add_attribute(Attribute::Bold),
Cell::new(""),
Cell::new(""),
]);
output(&table.to_string())?;
Ok(())
}
struct StatusChecks<'a> {
totals: &'a RowTotals,
adapter_count: usize,
index_status: &'a [IndexStatus],
embedding: Option<EmbeddingProgress>,
}
fn render_status_checks(checks: &StatusChecks) -> anyhow::Result<()> {
use pond::output::{dim, paint};
output("")?;
let health = classify_index_health(
checks.index_status,
checks.embedding.as_ref(),
pond::substrate::DEFAULT_SYNC_INDEX_FOLD_ROWS as u64,
);
output(&render_indexes_line(&health))?;
let messages_label = match &checks.embedding {
Some(e) => format!("{} searchable messages", format_thousands(e.total as u64)),
None => format!("{} messages", format_thousands(checks.totals.messages)),
};
output(&format!(
"{} {} sessions, {}",
paint("stored", dim()),
format_thousands(checks.totals.sessions),
messages_label,
))?;
let noun = if checks.adapter_count == 1 {
"source agent"
} else {
"source agents"
};
output(&format!(
"{} {} {noun} in this store",
paint("agents", dim()),
checks.adapter_count,
))?;
if checks.embedding.is_none() {
output_err(&paint(
"(use -v for searchable message count + embedding backlog)",
dim(),
))?;
}
Ok(())
}
struct LocalAdapterStatus {
name: String,
sources: Option<usize>,
plan: Option<pond::adapter::SyncPlan>,
}
struct LocalStatus {
hostname: Option<String>,
adapters: Vec<LocalAdapterStatus>,
pending_known: bool,
last_sync: Option<syncstate::LastSyncRecord>,
schedule: crate::schedule::ScheduleSnapshot,
next_run_secs: Option<i64>,
}
async fn local_status(
loaded: &Config,
store: &Store,
store_key: &str,
schedule: crate::schedule::ScheduleSnapshot,
) -> LocalStatus {
let active_every = schedule.every;
let hostname = whoami::hostname().ok();
let rowmap = store.open_cached_rowmap(&default_cache_dir());
let pending_known = rowmap.is_some();
let oracle = pond::sessions::RowmapOracle(rowmap);
let mut adapters = Vec::new();
for (name, blob) in loaded.resolve_adapters(None).unwrap_or_default() {
let Some(factory) = adapter::by_name(&name) else {
continue;
};
let Ok(opened) = factory.open(blob) else {
adapters.push(LocalAdapterStatus {
name,
sources: None,
plan: None,
});
continue;
};
let plan = if pending_known {
opened.plan(&oracle).await.ok().flatten()
} else {
None
};
let sources = match &plan {
Some(plan) => Some(plan.sources),
None => opened.discover().await.ok(),
};
adapters.push(LocalAdapterStatus {
name,
sources,
plan,
});
}
let last_sync = syncstate::read_last_sync(store_key);
let next_run_secs = active_every.and_then(|every| {
last_sync.as_ref().map(|record| {
(record.finished_at + chrono::Duration::seconds(i64::from(every.secs())) - Utc::now())
.num_seconds()
})
});
LocalStatus {
hostname,
adapters,
pending_known,
last_sync,
schedule,
next_run_secs,
}
}
fn render_local_status(local: &LocalStatus) -> anyhow::Result<()> {
use pond::output::{dim, green, paint, red};
output("")?;
if let Some(hostname) = &local.hostname {
output(&format!("{} {hostname}", paint("host", dim())))?;
}
let name_width = local
.adapters
.iter()
.map(|adapter| adapter.name.len())
.max()
.unwrap_or(0)
+ 2;
for (index, adapter) in local.adapters.iter().enumerate() {
let label = if index == 0 {
paint("local", dim()) + " "
} else {
" ".to_owned()
};
let detail = match (adapter.sources, &adapter.plan) {
(None, _) => paint(
&format!(
"source path unreadable - check [adapters.{}] in config",
adapter.name
),
red(),
),
(Some(sources), Some(plan)) if plan.pending == 0 => format!(
"{} sources - {}",
format_thousands(sources as u64),
paint("up to date", green()),
),
(Some(sources), Some(plan)) => format!(
"{} sources - {}",
format_thousands(sources as u64),
paint(
&format!("{} pending sync", format_thousands(plan.pending as u64)),
pond::output::yellow(),
),
),
(Some(sources), None) if local.pending_known => format!(
"{} sources {}",
format_thousands(sources as u64),
paint("(pending unknown - no cheap freshness preview)", dim()),
),
(Some(sources), None) => format!("{} sources", format_thousands(sources as u64)),
};
output(&format!(
"{label}{:<name_width$}{detail}",
adapter.name,
name_width = name_width,
))?;
}
if !local.adapters.is_empty() && !local.pending_known {
output_err(&paint(
"(pending-sync counts appear after the first `pond sync` on this host)",
dim(),
))?;
}
let last_sync_line = match &local.last_sync {
None if local.schedule.active => paint(
"never recorded on this host - the scheduled sync hasn't completed yet (`pond schedule logs`), or run `pond sync` now",
dim(),
),
None => paint("never on this host - run `pond sync`", dim()),
Some(record) => {
let ago = ago(record.finished_at);
match record.outcome {
syncstate::SyncOutcome::Ok => format!(
"{ago} ago - {}: +{} sessions, +{} searchable messages in {}",
paint("ok", green()),
format_thousands(record.sessions_inserted),
format_thousands(record.messages_inserted),
brief_duration(Duration::from_secs_f64(record.duration_secs)),
),
syncstate::SyncOutcome::Error => {
let reason = record
.error
.as_deref()
.and_then(|error| error.lines().next())
.unwrap_or("unknown error");
format!(
"{ago} ago - {}: {reason} (pond schedule logs)",
paint("FAILED", red()),
)
}
}
}
};
output(&format!("{} {last_sync_line}", paint("last sync", dim())))?;
let mut schedule_line = local.schedule.line.clone();
if let Some(next_run_secs) = local.next_run_secs {
let suffix = if next_run_secs <= 0 {
" - next run expected about now".to_owned()
} else {
format!(
" - next run expected within {}",
brief_duration(Duration::from_secs(next_run_secs as u64)),
)
};
schedule_line.push_str(&suffix);
}
output(&schedule_line)?;
Ok(())
}
fn render_host_activity(hosts: &[pond::sessions::HostActivity]) -> anyhow::Result<()> {
use pond::output::{dim, paint};
output("")?;
output(&paint("hosts", dim()))?;
let mut sorted: Vec<&pond::sessions::HostActivity> = hosts.iter().collect();
sorted.sort_by_key(|host| std::cmp::Reverse(host.last_message_at));
let mut table = new_table();
for host in sorted {
let ago = ago(host.last_message_at);
table.add_row(vec![
Cell::new(format!(
" {}",
host.hostname.as_deref().unwrap_or("(unstamped)")
)),
Cell::new(format!(
"{} sessions",
format_thousands(host.sessions as u64)
))
.set_alignment(CellAlignment::Right),
Cell::new(format!("last activity {ago} ago")).set_alignment(CellAlignment::Right),
]);
}
output(&table.to_string())?;
Ok(())
}
#[derive(Debug, Clone)]
enum IndexHealthState {
NotBuilt,
Ready,
Pending(u64),
}
#[derive(Debug, Clone)]
struct IndexHealth {
text: IndexHealthState,
semantic: IndexHealthState,
}
fn classify_index_health(
statuses: &[IndexStatus],
embedding: Option<&EmbeddingProgress>,
fold_threshold: u64,
) -> IndexHealth {
use IndexHealthState::*;
fn classify_one(status: &IndexStatus, fold_threshold: u64) -> IndexHealthState {
if !status.exists {
return NotBuilt;
}
if (status.unindexed_rows as u64) < fold_threshold.max(1) {
Ready
} else {
Pending(status.unindexed_rows as u64)
}
}
let mut text = NotBuilt;
let mut semantic = NotBuilt;
for status in statuses {
match status.intent_name.as_str() {
MESSAGES_FTS_INDEX => text = classify_one(status, fold_threshold),
MESSAGES_VECTOR_INDEX => semantic = classify_one(status, fold_threshold),
_ => {}
}
}
if let Some(progress) = embedding
&& progress.backlog > 0
&& matches!(semantic, Ready)
{
semantic = Pending(progress.backlog as u64);
}
IndexHealth { text, semantic }
}
fn render_indexes_line(health: &IndexHealth) -> String {
use IndexHealthState::*;
use pond::output::{dim, paint, yellow};
let body = match (&health.text, &health.semantic) {
(Ready, Ready) => "text + semantic ready".to_owned(),
_ => {
let text_part = match &health.text {
Ready => "text ready".to_owned(),
Pending(n) => format!("text {} pending", format_thousands(*n)),
NotBuilt => "text not built".to_owned(),
};
let semantic_part = match &health.semantic {
Ready => "semantic ready".to_owned(),
Pending(n) => format!("semantic {} pending", format_thousands(*n)),
NotBuilt => "semantic ready (brute-force; index builds at scale)".to_owned(),
};
format!("{text_part} . {semantic_part}")
}
};
let any_pending = matches!(health.text, Pending(_)) || matches!(health.semantic, Pending(_));
let label = if any_pending {
paint("indexes", yellow())
} else {
paint("indexes", dim())
};
format!("{label} {body}")
}
fn new_table() -> Table {
let mut table = Table::new();
table
.load_preset(NOTHING)
.set_content_arrangement(ContentArrangement::Dynamic);
table
}
fn render_search_envelope(
format: OutputFormat,
envelope: &SearchEnvelope,
request: &SearchRequest,
) -> anyhow::Result<bool> {
match format {
OutputFormat::Json => {
output(
&serde_json::to_string_pretty(envelope)
.context("serialize search envelope as JSON")?,
)?;
Ok(matches!(envelope, SearchEnvelope::Success(_)))
}
OutputFormat::Text => match envelope {
SearchEnvelope::Success(response) => {
let transcript = pond::render::render_search_transcript(
response,
request,
pond::render::Surface::Cli,
);
output(transcript.trim_end_matches('\n'))?;
Ok(true)
}
SearchEnvelope::Error(error) => {
render_error_pretty(error);
Ok(false)
}
},
}
}
fn render_get_envelope(
format: OutputFormat,
envelope: &GetEnvelope,
request: &GetRequest,
subagents_footer: &str,
) -> anyhow::Result<bool> {
match format {
OutputFormat::Json => {
output(
&serde_json::to_string_pretty(envelope)
.context("serialize get envelope as JSON")?,
)?;
Ok(matches!(envelope, GetEnvelope::Success(_)))
}
OutputFormat::Text => match envelope {
GetEnvelope::Success(response) => {
let transcript = pond::render::render_get_transcript(
response,
request,
pond::render::Surface::Cli,
);
let combined = format!("{transcript}{subagents_footer}");
output(combined.trim_end_matches('\n'))?;
Ok(true)
}
GetEnvelope::Error(error) => {
render_error_pretty(error);
Ok(false)
}
},
}
}
fn sql_error_for_cli(message: &str) -> String {
message
.replace("resource schema://pond-sql", "`pond sql --help`")
.replace("schema://pond-sql", "`pond sql --help`")
.replace("pond_sql_query", "`pond sql`")
.replace("pond_search", "`pond search`")
.replace("pond_get", "`pond get`")
}
fn render_error_pretty(error: &ErrorEnvelope) {
use pond::output::{bold, dim, paint, red};
let code = match error.error.code {
wire::ErrorCode::ValidationFailed => "validation_failed",
wire::ErrorCode::VersionUnsupported => "version_unsupported",
wire::ErrorCode::NotFound => "not_found",
wire::ErrorCode::NamespaceUnknown => "namespace_unknown",
wire::ErrorCode::StorageUnavailable => "storage_unavailable",
wire::ErrorCode::Conflict => "conflict",
wire::ErrorCode::Internal => "internal",
};
eprintln!(
"{} {} {}",
paint("error", red().bold()),
paint(code, bold()),
error.error.message,
);
if matches!(error.error.code, wire::ErrorCode::NotFound) {
eprintln!(
"{}",
paint(
" hint: message and session ids come from `pond search` output",
dim(),
),
);
}
let details_present = !error.error.details.is_null()
&& !error
.error
.details
.as_object()
.map(|map| map.is_empty())
.unwrap_or(false);
if details_present {
eprintln!(
"{}",
paint(&format!(" details: {}", error.error.details), dim()),
);
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
#[test]
fn brief_duration_picks_the_readable_unit() {
assert_eq!(brief_duration(Duration::from_secs(42)), "42s");
assert_eq!(brief_duration(Duration::from_secs(12 * 60)), "12m");
assert_eq!(
brief_duration(Duration::from_secs(3 * 3600 + 20 * 60)),
"3h 20m"
);
assert_eq!(brief_duration(Duration::from_secs(4 * 3600)), "4h");
assert_eq!(brief_duration(Duration::from_secs(2 * 86_400 + 3600)), "2d");
}
#[test]
fn sync_status_line_gains_an_eta_only_once_rate_is_meaningful() {
let estimate = Some(Duration::from_secs(90));
let line = sync_status_line(
"claude-code",
10,
100,
Duration::from_secs(2),
estimate,
"t",
);
assert!(!line.contains("eta"), "got: {line}");
let line = sync_status_line(
"claude-code",
10,
100,
Duration::from_secs(10),
estimate,
"t",
);
assert!(line.contains("eta 1m"), "got: {line}");
let line = sync_status_line(
"claude-code",
100,
100,
Duration::from_secs(10),
estimate,
"t",
);
assert!(!line.contains("eta"), "got: {line}");
let line = sync_status_line("claude-code", 3, 0, Duration::from_secs(10), estimate, "t");
assert!(!line.contains("eta"), "got: {line}");
let line = sync_status_line("claude-code", 10, 100, Duration::from_secs(10), None, "t");
assert!(!line.contains("eta"), "got: {line}");
}
#[test]
fn redaction_masks_secret_fields_but_spares_file_and_command_variants() {
assert_eq!(
redact_config_value("creds.work.access_key_id", "AKIA"),
"********"
);
assert_eq!(
redact_config_value("creds.work.secret_access_key", "s"),
"********"
);
assert_eq!(
redact_config_value("creds.work.extra.sas_token", "t"),
"********"
);
assert_eq!(
redact_config_value("creds.work.extra.request_timeout", "60s"),
"60s"
);
assert_eq!(
redact_config_value("creds.work.access_key_id_file", "/k"),
"/k"
);
assert_eq!(
redact_config_value("creds.work.secret_access_key_command", "op read x"),
"op read x"
);
assert_eq!(redact_config_value("creds.work.region", "fsn1"), "fsn1");
}
#[test]
fn flatten_config_emits_dotted_leaves_and_skips_nulls() {
let value = serde_json::json!({
"storage": {"path": "/p"},
"search": {"nprobes": null},
"creds": {"work": {"region": "r", "extra": {"a": "b"}}},
});
let mut rows = Vec::new();
flatten_config(String::new(), &value, &mut rows);
assert!(rows.contains(&("storage.path".to_owned(), "/p".to_owned())));
assert!(rows.contains(&("creds.work.extra.a".to_owned(), "b".to_owned())));
assert!(rows.iter().all(|(key, _)| key != "search.nprobes"));
}
#[test]
fn storage_location_resolves_flag_then_config_then_default() {
let mut loaded = Config::default();
let resolved = resolve_storage_location(None, &loaded).unwrap();
assert!(resolved.is_local());
loaded.storage.path = Some("s3://from-config/p".to_owned());
let resolved = resolve_storage_location(None, &loaded).unwrap();
assert_eq!(resolved.lance_url().as_str(), "s3://from-config/p");
let flag = StorageUrl::parse("s3://from-flag/p").unwrap();
let resolved = resolve_storage_location(Some(flag), &loaded).unwrap();
assert_eq!(resolved.lance_url().as_str(), "s3://from-flag/p");
}
#[test]
fn parse_storage_path_local_keyword_resolves_to_the_default_dir() {
let keyword = parse_storage_path("local").unwrap();
assert!(keyword.is_local());
assert_eq!(
keyword.canonical(),
default_local_storage().unwrap().canonical(),
);
assert_eq!(
parse_storage_path("DEFAULT").unwrap().canonical(),
keyword.canonical(),
);
let literal = parse_storage_path("./local").unwrap();
assert_ne!(literal.canonical(), keyword.canonical());
}
#[test]
fn cli_parses_and_subcommands_are_wired() {
Cli::command().debug_assert();
}
#[test]
fn set_creds_set_round_trips_through_config() {
let mut doc: toml_edit::DocumentMut =
"[storage]\npath = \"/tmp/p\"\n".parse().expect("parse");
set_creds_set(&mut doc, "default", "AKIA", "shh", None, None);
set_creds_set(
&mut doc,
"work",
"AKIB",
"shh2",
Some("s3+https://host/work"),
Some("fsn1"),
);
let config = Config::load_str(&doc.to_string()).expect("valid config");
let default = &config.creds["default"];
assert_eq!(default.access_key_id.as_deref(), Some("AKIA"));
assert_eq!(default.secret_access_key.as_deref(), Some("shh"));
assert!(default.scope.is_none());
let work = &config.creds["work"];
assert_eq!(work.scope.as_deref(), Some("s3+https://host/work"));
assert_eq!(work.region.as_deref(), Some("fsn1"));
set_creds_set(&mut doc, "default", "AKIC", "shh3", None, None);
let config = Config::load_str(&doc.to_string()).expect("valid after replace");
assert_eq!(config.creds.len(), 2);
assert_eq!(
config.creds["default"].access_key_id.as_deref(),
Some("AKIC")
);
}
#[test]
fn creds_delete_removes_set_drops_empty_parent_and_errors_when_absent() {
let dir = tempfile::tempdir().expect("tempdir");
let config_file = dir.path().join("config.toml");
let mut doc: toml_edit::DocumentMut = String::new().parse().expect("parse empty");
set_creds_set(&mut doc, "default", "AKIA", "shh", None, None);
write_config_doc(&config_file, &doc).expect("write");
assert!(
creds_delete(&config_file, "missing".to_owned()).is_err(),
"deleting an absent set must error"
);
creds_delete(&config_file, "default".to_owned()).expect("delete present set");
let body = std::fs::read_to_string(&config_file).expect("read back");
assert!(
!body.contains("creds"),
"emptied [creds] parent removed: {body:?}"
);
}
#[test]
fn adapters_list_json_carries_configured_and_detected_keys() {
let config =
Config::load_str("[adapters.claude-code]\nenabled = true\npath = \"/tmp/cc\"\n")
.expect("configured");
let json = adapters_list_json(&config).expect("json builds");
let value: serde_json::Value = serde_json::from_str(&json).expect("valid json");
assert!(value.get("configured").is_some(), "configured key present");
assert!(value.get("detected").is_some(), "detected key present");
let first = &value["configured"][0];
assert_eq!(first["name"], "claude-code");
assert_eq!(first["enabled"], true);
assert_eq!(first["path"], "/tmp/cc");
}
#[test]
fn creds_list_json_redacts_the_secret() {
let config = Config::load_str(
"[creds.work]\naccess_key_id = \"AKIASECRETKEY\"\nsecret_access_key = \"topsecretvalue\"\n",
)
.expect("configured");
let json = creds_list_json(&config).expect("json builds");
assert!(
!json.contains("topsecretvalue"),
"real secret must never appear: {json}"
);
assert!(
!json.contains("AKIASECRETKEY"),
"real access key must never appear: {json}"
);
let value: serde_json::Value = serde_json::from_str(&json).expect("valid json");
assert_eq!(value[0]["name"], "work");
assert_eq!(value[0]["access_key"], "********");
assert_eq!(value[0]["secret"], "********");
}
#[test]
fn resolve_sync_adapters_errors_point_at_the_adapters_commands() {
let empty = Config::load_str("").expect("empty config");
let err = resolve_sync_adapters(&empty, None, None).expect_err("empty must error");
assert!(
err.to_string().contains("pond adapters discover"),
"empty error should name discover: {err}"
);
let err = resolve_sync_adapters(&empty, Some("claude-code"), None)
.expect_err("unconfigured named must error");
assert!(
err.to_string().contains("pond adapters enable claude-code"),
"named error should name enable: {err}"
);
let configured =
Config::load_str("[adapters.claude-code]\nenabled = true\npath = \"/tmp/cc\"\n")
.expect("configured");
let resolved =
resolve_sync_adapters(&configured, Some("claude-code"), None).expect("resolves");
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].0, "claude-code");
assert!(
resolved[0].1.get("enabled").is_none(),
"enabled discriminator must not reach the factory blob",
);
let disabled =
Config::load_str("[adapters.claude-code]\nenabled = false\npath = \"/tmp/cc\"\n")
.expect("disabled config");
let err = resolve_sync_adapters(&disabled, Some("claude-code"), None)
.expect_err("disabled named must error");
assert!(
err.to_string().contains("disabled"),
"disabled error should say disabled: {err}"
);
assert!(
err.to_string().contains("pond adapters enable claude-code"),
"disabled error should name the enable verb: {err}"
);
}
#[test]
fn help_snapshots() {
let mut root = Cli::command();
root.build();
let root_help = root
.render_long_help()
.to_string()
.replace(VERSION.as_str(), "[VERSION]");
insta::assert_snapshot!("help_root", root_help);
let visible: Vec<String> = root
.get_subcommands()
.filter(|sub| !sub.is_hide_set() && sub.get_name() != "help")
.map(|sub| sub.get_name().to_owned())
.collect();
for name in visible {
let sub = root
.find_subcommand_mut(&name)
.expect("visible subcommand exists");
insta::assert_snapshot!(format!("help_{name}"), sub.render_long_help().to_string());
}
}
#[test]
fn help_template_lists_every_subcommand() {
let mut root = Cli::command();
root.build();
let help = root.render_long_help().to_string();
for sub in root.get_subcommands() {
let name = sub.get_name();
assert!(
help.contains(name),
"subcommand `{name}` is missing from HELP_TEMPLATE - add a line for it",
);
}
}
#[test]
fn copy_endpoint_at_keyword_resolves_to_the_configured_store() {
let mut loaded = Config::default();
loaded.storage.path = Some("s3://from-config/p".to_owned());
let from_config = resolve_copy_endpoint("@", &None, &loaded).unwrap();
assert!(
matches!(&from_config, CopyEndpoint::Store(url) if url.lance_url().as_str() == "s3://from-config/p"),
);
let flag = StorageUrl::parse("s3://from-flag/p").unwrap();
let from_flag = resolve_copy_endpoint("@", &Some(flag), &loaded).unwrap();
assert!(
matches!(&from_flag, CopyEndpoint::Store(url) if url.lance_url().as_str() == "s3://from-flag/p"),
);
assert!(matches!(
resolve_copy_endpoint("local", &None, &loaded).unwrap(),
CopyEndpoint::Store(url) if url.is_local()
));
}
#[test]
fn ensure_source_not_empty_rejects_a_zero_row_source() {
let table = |source_rows| TableVerify {
table: substrate::Table::Sessions,
source_rows,
missing: 0,
dest_duplicates: 0,
};
let empty = StorageVerify {
tables: vec![table(0)],
dest_indexes: IndexCoverage {
fts_present: false,
vector_present_or_below_activation: true,
},
};
assert!(ensure_source_not_empty(&empty, "s3://typo/bucket").is_err());
let populated = StorageVerify {
tables: vec![table(3)],
dest_indexes: IndexCoverage {
fts_present: false,
vector_present_or_below_activation: true,
},
};
assert!(ensure_source_not_empty(&populated, "local").is_ok());
}
}