use std::collections::BTreeMap;
use std::io::IsTerminal;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use anyhow::{Context, Result, bail};
use pond::adapter::{self, Candidate};
use pond::config::{self, Config, CredsSet};
use pond::substrate::StorageUrl;
use toml_edit::{DocumentMut, Item, Table, value};
use crate::schedule::{self, ScheduleEvery};
#[derive(Debug, clap::Args)]
pub(crate) struct InitArgs {
#[arg(long, value_parser = crate::parse_storage_path, value_name = "URL")]
storage_path: Option<StorageUrl>,
#[arg(long, value_delimiter = ',', value_name = "NAMES")]
adapters: Option<Vec<String>>,
#[arg(long, value_enum, value_name = "EVERY")]
schedule: Option<ScheduleEvery>,
#[arg(long)]
skip_mcp: bool,
#[arg(long, short = 'y')]
yes: bool,
#[arg(long)]
force: bool,
#[arg(long, env = "POND_CONFIG", hide_env_values = true, value_name = "PATH")]
config: Option<PathBuf>,
}
struct WizardTheme;
impl cliclack::Theme for WizardTheme {
fn format_footer_with_message(&self, state: &cliclack::ThemeState, message: &str) -> String {
use cliclack::ThemeState;
format!(
"{}\n",
self.bar_color(state).apply_to(match state {
ThemeState::Active => format!("â”” {message}"),
ThemeState::Cancel | ThemeState::Submit => "│".to_owned(),
ThemeState::Error(err) => format!("â”” {err}"),
})
)
}
}
fn wiz<T>(result: std::io::Result<T>) -> Result<T> {
match result {
Ok(inner) => Ok(inner),
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {
let _ = cliclack::outro_cancel("Cancelled - nothing written");
std::process::exit(1);
}
Err(error) => Err(error).context("prompt failed"),
}
}
pub(crate) async fn run(args: InitArgs) -> Result<()> {
let config_file = crate::config_path(args.config.clone());
let interactive = std::io::stdin().is_terminal();
let any_flag = args.storage_path.is_some()
|| args.adapters.is_some()
|| args.schedule.is_some()
|| args.skip_mcp;
if !interactive && !args.yes && !any_flag {
bail!(
"stdin is not a terminal; run `pond init --yes` to accept defaults, or answer \
sections with --storage-path / --adapters / --schedule"
);
}
let prompts = interactive && !args.yes;
if prompts {
let _ = ctrlc::set_handler(|| {});
cliclack::set_theme(WizardTheme);
}
let existing_text = if config_file.exists() {
std::fs::read_to_string(&config_file)
.with_context(|| format!("failed to read {}", config_file.display()))?
} else {
String::new()
};
let mut doc: DocumentMut = existing_text
.parse()
.with_context(|| format!("failed to parse {} as TOML", config_file.display()))?;
cliclack::intro("pond init")?;
let legacy_prefill = match extract_legacy_storage(&doc) {
Some(legacy) => {
let usable = legacy_url_guess(&legacy)
.as_deref()
.is_some_and(|url| StorageUrl::parse(url).is_ok());
if prompts {
cliclack::log::warning(format!(
"{} uses the old [storage] passthrough format",
display_path(&config_file),
))?;
let rewrite = wiz(
cliclack::confirm(
"Rewrite it now? (keys move to [creds.default]; the endpoint folds into the storage URL)",
)
.initial_value(true)
.interact(),
)?;
if !rewrite {
cliclack::outro_cancel(
"Cancelled - the old [storage] format must be rewritten first (`pond config schema` shows the new shape)",
)?;
std::process::exit(1);
}
if !usable {
cliclack::log::warning(
"the old endpoint folds the bucket into the hostname; add the bucket and prefix to the URL below: s3+https://<host>/<bucket>/<prefix>",
)?;
}
} else if args.storage_path.is_none() && !usable {
bail!(
"the old [storage] map folds the bucket into the endpoint hostname, so the destination URL can't be derived automatically; re-run as `pond init --storage-path s3+https://<host>/<bucket>/<prefix>` (this same run then moves the credentials into [creds.default])"
);
}
Some(apply_legacy_rewrite(&mut doc, &legacy))
}
None => None,
};
let default_storage = if args.force {
platform_default_storage()
} else {
legacy_prefill
.flatten()
.or_else(|| {
doc.get("storage")
.and_then(Item::as_table_like)
.and_then(|table| table.get("path"))
.and_then(Item::as_str)
.map(str::to_owned)
})
.unwrap_or_else(platform_default_storage)
};
let chosen = pick_storage(&args, &doc, &default_storage, prompts).await?;
let chosen_display = crate::storage_config_value(&chosen);
let current_path = doc
.get("storage")
.and_then(Item::as_table_like)
.and_then(|table| table.get("path"))
.and_then(Item::as_str)
.map(str::to_owned);
if current_path.as_deref() != Some(chosen_display.as_str()) {
crate::set_storage_path(&mut doc, &chosen_display);
}
let rows = source_rows(&doc, args.force);
let picked = pick_sources(&args, &rows, prompts)?;
let mut fresh_accepts: Vec<Candidate> = Vec::new();
let mut fresh_declines: Vec<&str> = Vec::new();
for row in &rows {
let want = picked.contains(&row.name);
match &row.state {
RowState::Fresh(candidate) => {
if want {
fresh_accepts.push(candidate.clone());
} else {
fresh_declines.push(row.name.as_str());
}
}
RowState::Configured { enabled } => {
if *enabled != want {
doc["sources"][row.name.as_str()]["enabled"] = value(want);
}
}
}
}
adapter::apply_to_doc(&mut doc, &fresh_accepts, &fresh_declines)?;
let model = doc
.get("embeddings")
.and_then(Item::as_table_like)
.and_then(|table| table.get("model"))
.and_then(Item::as_str)
.map(str::to_owned)
.unwrap_or_else(|| pond::embed::DEFAULT_MODEL_ID.to_owned());
cliclack::log::info(format!(
"embeddings: {model} - override under [embeddings] in config"
))?;
let schedule_choice: Option<ScheduleEvery> = match args.schedule {
Some(every) => Some(every),
None if prompts => {
let wanted = wiz(
cliclack::confirm("Run pond sync automatically on a schedule?")
.initial_value(false)
.interact(),
)?;
if wanted {
Some(wiz(cliclack::select("How often?")
.item(ScheduleEvery::M15, "every 15 minutes", "")
.item(ScheduleEvery::H1, "every hour", "recommended")
.item(ScheduleEvery::H6, "every 6 hours", "")
.item(ScheduleEvery::D1, "daily", "")
.initial_value(ScheduleEvery::H1)
.interact())?)
} else {
None
}
}
None => None,
};
let mut plan = format!("storage {chosen_display}");
let enabled: Vec<&str> = picked.iter().map(String::as_str).collect();
let disabled: Vec<&str> = rows
.iter()
.filter(|row| !picked.contains(&row.name))
.map(|row| row.name.as_str())
.collect();
plan.push_str(&format!(
"\nsources {}",
if enabled.is_empty() {
"(none)".to_owned()
} else {
enabled.join(", ")
},
));
if !disabled.is_empty() {
plan.push_str(&format!("\ndisabled {}", disabled.join(", ")));
}
if let Some(every) = schedule_choice {
plan.push_str(&format!("\nschedule pond sync every {}", every.label()));
}
plan.push_str(&format!("\nconfig {}", display_path(&config_file)));
cliclack::note("Plan", plan)?;
if prompts {
let write = wiz(cliclack::confirm("Write config?")
.initial_value(true)
.interact())?;
if !write {
cliclack::outro_cancel("Cancelled - nothing written")?;
std::process::exit(1);
}
}
let new_text = doc.to_string();
let changed = new_text != existing_text;
if changed {
if let Some(parent) = config_file.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
std::fs::write(&config_file, &new_text)
.with_context(|| format!("failed to write {}", config_file.display()))?;
}
if !args.skip_mcp {
mcp_section(prompts, args.yes)?;
}
if let Some(every) = schedule_choice {
schedule::start(every)?;
}
cliclack::note(
"Next steps",
"pond sync import your sessions\npond status check health\npond --help explore the rest",
)?;
if changed {
cliclack::outro(format!("Config written to {}", display_path(&config_file)))?;
} else {
cliclack::outro("Already set up - nothing to change")?;
}
Ok(())
}
fn display_path(path: &Path) -> String {
config::contract_home(path).display().to_string()
}
fn platform_default_storage() -> String {
config::default_storage_path(
std::env::var_os("XDG_DATA_HOME").map(PathBuf::from),
std::env::var_os("HOME").map(PathBuf::from),
)
.ok()
.and_then(|url| config::local_path(&url))
.map(|path| config::contract_home(&path).display().to_string())
.unwrap_or_else(|| "~/.local/share/pond".to_owned())
}
async fn pick_storage(
args: &InitArgs,
doc: &DocumentMut,
default: &str,
prompts: bool,
) -> Result<StorageUrl> {
let creds = Config::load_str(&doc.to_string())
.map(|config| config.creds)
.unwrap_or_default();
if let Some(chosen) = args.storage_path.clone() {
if !chosen.is_local()
&& let Err(reason) = probe_destination(&chosen, &creds).await
{
bail!(
"--storage-path {} failed the end-to-end check: {reason}; fix the creds (define [creds.default] or POND_CREDS_DEFAULT_*) or pick another destination",
chosen.display(),
);
}
return Ok(chosen);
}
if !prompts {
let chosen = StorageUrl::parse(default)
.with_context(|| format!("existing storage path {default:?} does not parse"))?;
if !chosen.is_local()
&& let Err(reason) = probe_destination(&chosen, &creds).await
{
cliclack::log::warning(format!(
"configured storage {} failed the end-to-end check: {reason}",
chosen.display(),
))?;
}
return Ok(chosen);
}
let mut current = default.to_owned();
loop {
let text: String = wiz(cliclack::input("Where should pond store its data?")
.default_input(¤t)
.validate(|input: &String| {
StorageUrl::parse(input)
.map(|_| ())
.map_err(|error| format!("{error:#}"))
})
.interact())?;
let chosen = StorageUrl::parse(&text)?;
if chosen.is_local() {
return Ok(chosen);
}
match probe_destination(&chosen, &creds).await {
Ok(()) => return Ok(chosen),
Err(reason) => {
cliclack::log::warning(format!(
"{reason}\nCreds bind via [creds.default] in config or POND_CREDS_DEFAULT_* env (spec: creds scope match)."
))?;
let keep = wiz(cliclack::confirm("Keep this destination anyway?")
.initial_value(false)
.interact())?;
if keep {
return Ok(chosen);
}
current = text;
}
}
}
}
async fn probe_destination(
url: &StorageUrl,
creds: &BTreeMap<String, CredsSet>,
) -> std::result::Result<(), String> {
let resolved = url.resolve(creds).map_err(|error| format!("{error:#}"))?;
let spinner = cliclack::spinner();
spinner.start(format!("Probing {}...", url.display()));
match pond::substrate::storage_check(&resolved).await {
Ok(()) => {
spinner.stop(format!(
"storage: {} reachable - conditional writes (OCC) supported",
url.display(),
));
Ok(())
}
Err(failure) => {
spinner.error("storage probe failed");
tracing::debug!("full probe failure: {failure:?}");
Err(match failure.concise_cause() {
Some(cause) => format!("{failure} ({cause})"),
None => failure.to_string(),
})
}
}
}
enum RowState {
Configured { enabled: bool },
Fresh(Candidate),
}
struct SourceRow {
name: String,
hint: String,
state: RowState,
preselected: bool,
}
fn source_rows(doc: &DocumentMut, force: bool) -> Vec<SourceRow> {
let configured = doc.get("sources").and_then(Item::as_table_like);
let candidates = adapter::discover(None);
let candidate_for = |name: &str| candidates.iter().find(|c| c.name == name);
let mut rows = Vec::new();
let mut seen: Vec<&str> = Vec::new();
for factory in adapter::registry() {
let name = factory.name();
seen.push(name);
let entry = configured.and_then(|table| table.get(name));
match (entry, candidate_for(name)) {
(Some(item), candidate) => {
let enabled = item
.as_table_like()
.and_then(|table| table.get("enabled"))
.and_then(Item::as_bool)
.unwrap_or(false);
let hint = item
.as_table_like()
.and_then(|table| table.get("path"))
.and_then(Item::as_str)
.map(|path| config::contract_home(Path::new(path)).display().to_string())
.or_else(|| candidate.map(|c| c.hint.clone()))
.unwrap_or_default();
rows.push(SourceRow {
name: name.to_owned(),
hint,
state: RowState::Configured { enabled },
preselected: if force { candidate.is_some() } else { enabled },
});
}
(None, Some(candidate)) => rows.push(SourceRow {
name: name.to_owned(),
hint: candidate.hint.clone(),
state: RowState::Fresh(candidate.clone()),
preselected: true,
}),
(None, None) => {}
}
}
if let Some(table) = configured {
for (name, item) in table.iter() {
if seen.contains(&name) {
continue;
}
let enabled = item
.as_table_like()
.and_then(|t| t.get("enabled"))
.and_then(Item::as_bool)
.unwrap_or(false);
rows.push(SourceRow {
name: name.to_owned(),
hint: "(unknown adapter)".to_owned(),
state: RowState::Configured { enabled },
preselected: !force && enabled,
});
}
}
rows
}
fn pick_sources(args: &InitArgs, rows: &[SourceRow], prompts: bool) -> Result<Vec<String>> {
if let Some(requested) = &args.adapters {
let known = adapter::known_names();
for name in requested {
if !known.contains(&name.as_str()) {
bail!("unknown adapter {name:?}; known: {}", known.join(", "));
}
if !rows.iter().any(|row| &row.name == name) {
bail!(
"adapter {name:?} was not detected on this machine and has no [sources.{name}] entry; pass a path via `pond sync {name} --source-dir <path>` or add the section manually"
);
}
}
return Ok(requested.clone());
}
if rows.is_empty() {
cliclack::log::info("sources: none detected - add [sources.<adapter>] entries manually")?;
return Ok(Vec::new());
}
if !prompts {
return Ok(rows
.iter()
.filter(|row| row.preselected)
.map(|row| row.name.clone())
.collect());
}
let mut picker = cliclack::multiselect("Which sources should pond sync?")
.required(false)
.initial_values(
rows.iter()
.filter(|row| row.preselected)
.map(|row| row.name.clone())
.collect(),
);
for row in rows {
picker = picker.item(row.name.clone(), &row.name, &row.hint);
}
wiz(picker.interact())
}
fn mcp_section(prompts: bool, auto: bool) -> Result<()> {
let claude = crate::find_on_path("claude");
let codex = crate::find_on_path("codex");
if claude.is_none() && codex.is_none() {
cliclack::log::info(
"mcp: no agent CLI detected - register later with `claude mcp add -s user pond -- pond mcp`",
)?;
return Ok(());
}
if claude.is_some() {
let registered = Command::new("claude")
.args(["mcp", "get", "pond"])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status()
.map(|status| status.success())
.unwrap_or(false);
if registered {
cliclack::log::success("mcp: pond is already registered in Claude Code")?;
} else {
let add = if prompts {
wiz(
cliclack::confirm("Register pond as an MCP server in Claude Code?")
.initial_value(true)
.interact(),
)?
} else {
auto
};
if add {
let output = Command::new("claude")
.args(["mcp", "add", "-s", "user", "pond", "--", "pond", "mcp"])
.output()
.context("failed to run `claude mcp add`")?;
if output.status.success() {
cliclack::log::success("mcp: registered in Claude Code (user scope)")?;
} else {
cliclack::log::warning(format!(
"mcp: `claude mcp add` exited {}: {} - run `claude mcp add -s user pond -- pond mcp` manually",
output.status,
String::from_utf8_lossy(&output.stderr).trim(),
))?;
}
} else {
cliclack::log::info(
"mcp: skipped - register later with `claude mcp add -s user pond -- pond mcp`",
)?;
}
}
}
if codex.is_some() {
cliclack::note(
"codex detected",
"register pond manually:\n codex mcp add pond -- pond mcp",
)?;
}
Ok(())
}
struct LegacyStorage {
access_key_id: Option<String>,
secret_access_key: Option<String>,
endpoint: Option<String>,
path: Option<String>,
}
fn extract_legacy_storage(doc: &DocumentMut) -> Option<LegacyStorage> {
let storage = doc.get("storage")?.as_table_like()?;
let has_extra_keys = storage.iter().any(|(key, _)| key != "path");
if !has_extra_keys {
return None;
}
let get = |names: &[&str]| {
storage.iter().find_map(|(key, item)| {
names
.iter()
.any(|name| key.eq_ignore_ascii_case(name))
.then(|| item.as_str().unwrap_or_default().to_owned())
})
};
Some(LegacyStorage {
access_key_id: get(config::LEGACY_ACCESS_KEY_KEYS),
secret_access_key: get(config::LEGACY_SECRET_KEY_KEYS),
endpoint: get(config::LEGACY_ENDPOINT_KEYS),
path: storage
.get("path")
.and_then(Item::as_str)
.map(str::to_owned),
})
}
fn apply_legacy_rewrite(doc: &mut DocumentMut, legacy: &LegacyStorage) -> Option<String> {
doc["storage"] = Item::Table(Table::new());
if legacy.access_key_id.is_some() || legacy.secret_access_key.is_some() {
let mut set = Table::new();
if let Some(key) = &legacy.access_key_id {
set.insert("access_key_id", value(key));
}
if let Some(secret) = &legacy.secret_access_key {
set.insert("secret_access_key", value(secret));
}
match doc.get_mut("creds").and_then(Item::as_table_mut) {
Some(creds) => {
creds.insert("default", Item::Table(set));
}
None => {
let mut creds = Table::new();
creds.set_implicit(true);
creds.insert("default", Item::Table(set));
doc.insert("creds", Item::Table(creds));
}
}
}
legacy_url_guess(legacy)
}
fn legacy_url_guess(legacy: &LegacyStorage) -> Option<String> {
let host = legacy
.endpoint
.as_deref()
.and_then(|endpoint| endpoint.split("://").nth(1))
.map(|host| host.trim_end_matches('/'));
match (host, legacy.path.as_deref()) {
(Some(host), Some(path)) if path.starts_with("s3://") => Some(format!(
"s3+https://{host}/{}",
path.trim_start_matches("s3://"),
)),
(_, Some(path)) if StorageUrl::parse(path).is_ok() => Some(path.to_owned()),
(Some(host), _) => Some(format!("s3+https://{host}/")),
(None, Some(path)) => Some(path.to_owned()),
(None, None) => None,
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
#[test]
fn legacy_rewrite_moves_keys_and_prefills_the_url() {
let mut doc: DocumentMut = r#"
[sources.claude-code]
enabled = true
path = "/srv/claude"
[storage]
AWS_ACCESS_KEY_ID = "AKIA123"
AWS_SECRET_ACCESS_KEY = "shh"
AWS_REGION = "nbg1"
AWS_ENDPOINT = "https://nbg1.example.com"
"#
.parse()
.unwrap();
let legacy = extract_legacy_storage(&doc).expect("legacy map detected");
let prefill = apply_legacy_rewrite(&mut doc, &legacy);
assert_eq!(prefill.as_deref(), Some("s3+https://nbg1.example.com/"));
let body = doc.to_string();
assert!(body.contains("[creds.default]"), "got: {body}");
assert!(body.contains("access_key_id = \"AKIA123\""), "got: {body}");
assert!(!body.contains("AWS_ACCESS_KEY_ID"), "got: {body}");
assert!(
!body.contains("nbg1\""),
"region must not carry over: {body}"
);
assert!(body.contains("[sources.claude-code]"), "got: {body}");
Config::load_str(&body).expect("rewritten config loads");
}
#[test]
fn legacy_url_guess_prefers_a_full_url_path_over_the_endpoint() {
let legacy = LegacyStorage {
access_key_id: Some("AKIA123".to_owned()),
secret_access_key: Some("shh".to_owned()),
endpoint: Some("https://nbg1.example.com".to_owned()),
path: Some("s3+https://nbg1.example.com/bucket/prefix".to_owned()),
};
assert_eq!(
legacy_url_guess(&legacy).as_deref(),
Some("s3+https://nbg1.example.com/bucket/prefix"),
);
}
#[test]
fn legacy_url_guess_folds_the_endpoint_into_a_plain_s3_path() {
let legacy = LegacyStorage {
access_key_id: Some("AKIA123".to_owned()),
secret_access_key: Some("shh".to_owned()),
endpoint: Some("https://nbg1.example.com".to_owned()),
path: Some("s3://mybucket/agent-sessions".to_owned()),
};
assert_eq!(
legacy_url_guess(&legacy).as_deref(),
Some("s3+https://nbg1.example.com/mybucket/agent-sessions"),
);
let ambient = LegacyStorage {
endpoint: None,
..legacy
};
assert_eq!(
legacy_url_guess(&ambient).as_deref(),
Some("s3://mybucket/agent-sessions"),
);
}
#[test]
fn legacy_url_guess_is_bucketless_for_a_virtual_hosted_endpoint() {
let legacy = LegacyStorage {
access_key_id: Some("AKIA123".to_owned()),
secret_access_key: Some("shh".to_owned()),
endpoint: Some("https://ttq.nbg1.your-objectstorage.com".to_owned()),
path: None,
};
let guess = legacy_url_guess(&legacy).expect("a guess is produced");
assert_eq!(guess, "s3+https://ttq.nbg1.your-objectstorage.com/");
assert!(
StorageUrl::parse(&guess).is_err(),
"bucketless guess must not validate: {guess}"
);
}
#[test]
fn new_format_storage_is_not_flagged_as_legacy() {
let doc: DocumentMut = "[storage]\npath = \"~/.local/share/pond\"\n"
.parse()
.unwrap();
assert!(extract_legacy_storage(&doc).is_none());
let empty: DocumentMut = "".parse().unwrap();
assert!(extract_legacy_storage(&empty).is_none());
}
}