use std::collections::{BTreeMap, HashSet};
use std::env;
use std::fs;
use std::io::{BufRead, BufReader, BufWriter, Read, Write};
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, ExitCode};
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, bail, Context, Result};
use serde::{Deserialize, Serialize};
use crate::content_trust::ContentTrust;
use crate::output::CommandReport;
use crate::paths::state::{
MemoryConfig as RawMemoryConfig, MemoryProviderConfig as RawMemoryProviderConfig, StateLayout,
};
use crate::profile;
use crate::repo::marker as repo_marker;
use crate::state::compiled;
use crate::state::runtime::{self as runtime_state, RuntimeMemoryEntry, RuntimeMemoryOrigin};
use super::recall::{
ProviderDescriptor as DetailedProviderDescriptor,
ProviderReference as DetailedProviderReference, RecallHealthView as DetailedRecallHealthView,
RecallProvenance as DetailedRecallProvenance, RecallProvider as DetailedRecallProvider,
RecallProviderCapabilities as DetailedRecallProviderCapabilities,
RecallResult as DetailedRecallResult, RecallScope as DetailedRecallScope,
RecallTemporalHints as DetailedRecallTemporalHints, StartRecallBudget,
};
const BUILTIN_PROVIDER_NAME: &str = "markdown";
const DEFAULT_PROVIDER_TIMEOUT_S: u64 = 10;
const DEFAULT_RECALL_SEARCH_LIMIT: usize = 5;
const DEFAULT_RECALL_EXPAND_LIMIT: usize = 3;
const START_RECALL_LIMIT: usize = 3;
const START_RECALL_PREVIEW_CHARS: usize = 240;
const REQUIRED_EXTERNAL_CAPABILITIES: &[&str] = &["search", "describe", "expand", "health"];
const REQUIRED_INGEST_REPORTING_CAPABILITIES: &[&str] = &["sync-status", "source-map"];
const VALID_SCOPES: &[&str] = &["profile", "repo", "pod", "branch", "clone"];
fn canonical_scope_name(scope: &str) -> &'static str {
match scope {
"profile" => "profile",
"repo" => "project",
"pod" => "pod",
"branch" => "work_stream",
"clone" => "workspace",
_ => "unknown",
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProviderKind {
AuthoredMarkdown,
ExternalCommand,
}
impl ProviderKind {
fn as_str(self) -> &'static str {
match self {
Self::AuthoredMarkdown => "authored-markdown",
Self::ExternalCommand => "external-command",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProviderSource {
BuiltinFallback,
ProfileConfig,
RepoOverlay,
}
impl ProviderSource {
fn as_str(self) -> &'static str {
match self {
Self::BuiltinFallback => "built-in-fallback",
Self::ProfileConfig => "profile-config",
Self::RepoOverlay => "repo-overlay",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StartRecallPolicy {
Disabled,
Auto,
}
impl StartRecallPolicy {
fn as_str(self) -> &'static str {
match self {
Self::Disabled => "disabled",
Self::Auto => "auto",
}
}
}
#[derive(Debug, Clone, Default)]
struct RecallCapabilities {
search: bool,
describe: bool,
expand: bool,
health: bool,
}
impl RecallCapabilities {
fn all() -> Self {
Self {
search: true,
describe: true,
expand: true,
health: true,
}
}
fn from_declared(values: &[String]) -> Result<Self> {
let mut caps = Self::default();
for value in values {
match normalize_name(value).as_str() {
"search" => caps.search = true,
"describe" => caps.describe = true,
"expand" => caps.expand = true,
"health" => caps.health = true,
other => bail!("unsupported recall capability `{other}`"),
}
}
Ok(caps)
}
fn supports_all_recall_ops(&self) -> bool {
self.search && self.describe && self.expand && self.health
}
fn names(&self) -> Vec<String> {
let mut names = Vec::new();
if self.search {
names.push("search".to_owned());
}
if self.describe {
names.push("describe".to_owned());
}
if self.expand {
names.push("expand".to_owned());
}
if self.health {
names.push("health".to_owned());
}
names
}
}
#[derive(Debug, Clone)]
enum ProviderBackend {
AuthoredMarkdown,
ExternalCommand(ExternalCommandConfig),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ExternalCommandMode {
OneShot,
CommandSession,
}
#[derive(Debug, Clone)]
struct ExternalCommandConfig {
mode: ExternalCommandMode,
command: Vec<String>,
timeout_s: u64,
}
#[derive(Debug, Clone)]
struct ResolvedRecallProvider {
name: String,
kind: ProviderKind,
source: ProviderSource,
capabilities: RecallCapabilities,
allowed_scopes: Vec<String>,
inputs: BTreeMap<String, String>,
backend: ProviderBackend,
}
#[derive(Debug, Clone)]
struct ResolvedIngestProvider {
name: String,
kind: ProviderKind,
source: ProviderSource,
capabilities: Vec<String>,
allowed_scopes: Vec<String>,
inputs: BTreeMap<String, String>,
backend: ProviderBackend,
}
#[derive(Debug, Clone)]
struct EffectiveMemoryConfig {
authoritative_provider: String,
configured_recall_provider: Option<String>,
configured_ingest_provider: Option<String>,
start_recall_policy: Option<String>,
providers: BTreeMap<String, ResolvedProviderEntry>,
ingest_providers: BTreeMap<String, ResolvedProviderEntry>,
}
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
struct MemoryStartConfigFile {
memory: MemoryStartFileMemoryConfig,
}
#[derive(Debug, Default, Deserialize)]
#[serde(default)]
struct MemoryStartFileMemoryConfig {
start: MemoryStartConfig,
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(default)]
struct MemoryStartConfig {
enabled: Option<bool>,
search_limit: Option<usize>,
describe_limit: Option<usize>,
expand_limit: Option<usize>,
}
impl MemoryStartConfig {
fn enabled_for(&self, has_provider: bool) -> bool {
self.enabled.unwrap_or(has_provider)
}
fn budget(&self) -> StartRecallBudget {
let mut budget = StartRecallBudget::default();
if let Some(value) = self.search_limit {
budget.search_limit = value;
}
if let Some(value) = self.describe_limit {
budget.describe_limit = value;
}
if let Some(value) = self.expand_limit {
budget.expand_limit = value;
}
budget
}
fn merge(profile: &Self, overlay: &Self) -> Self {
Self {
enabled: overlay.enabled.or(profile.enabled),
search_limit: overlay.search_limit.or(profile.search_limit),
describe_limit: overlay.describe_limit.or(profile.describe_limit),
expand_limit: overlay.expand_limit.or(profile.expand_limit),
}
}
}
#[derive(Debug, Clone)]
struct ResolvedProviderEntry {
source: ProviderSource,
config: RawMemoryProviderConfig,
}
#[derive(Debug, Clone)]
pub struct MemoryProviderIssue {
pub check: &'static str,
pub file: String,
pub message: String,
}
#[derive(Debug, Clone)]
pub struct MemoryProviderInspection {
pub view: MemoryProviderView,
pub issues: Vec<MemoryProviderIssue>,
resolved: ResolvedRecallProvider,
fallback_from: Option<String>,
}
#[derive(Debug)]
pub(crate) struct PreparedStartRecall {
pub(crate) provider: Option<ResolvedRecallAdapter>,
pub(crate) fallback: ResolvedRecallAdapter,
pub(crate) budget: StartRecallBudget,
pub(crate) config_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MemoryProviderView {
pub status: &'static str,
pub authoritative_provider: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub configured_recall_provider: Option<String>,
pub effective_recall_provider: ProviderIdentityView,
pub start_recall_policy: StartRecallPolicyView,
pub health: ProviderHealthView,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub issues: Vec<String>,
pub advisory_ingest_provider: AdvisoryIngestProviderView,
}
#[derive(Debug, Clone, Serialize)]
pub struct AdvisoryIngestProviderView {
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub configured_provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub effective_provider: Option<ProviderIdentityView>,
pub sync_status: AdvisoryIngestSyncStatusView,
pub source_map: AdvisoryIngestSourceMapView,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub issues: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvisoryIngestSyncStatusView {
pub status: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checked_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub latest_successful_ingest_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lag_seconds: Option<u64>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub scoped_source_coverage: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvisoryIngestSourceMapView {
pub status: String,
pub message: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub sources: Vec<AdvisoryIngestSourceMapEntryView>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvisoryIngestSourceMapEntryView {
pub source_id: String,
pub source_kind: String,
pub ccd_scope: String,
pub state: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub uri: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub observed_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provider_scope: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ProviderIdentityView {
pub name: String,
pub kind: String,
pub source: String,
pub capabilities: Vec<String>,
pub allowed_scopes: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct StartRecallPolicyView {
pub mode: String,
pub search_limit: usize,
pub preview_chars: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct ProviderHealthView {
pub status: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checked_at_epoch_s: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct StartupRecallView {
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provider: Option<String>,
pub results: Vec<RecallResult>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RecallCommandReport {
pub command: &'static str,
pub ok: bool,
pub status: &'static str,
pub path: String,
pub profile: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub native_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub configured_provider: Option<String>,
pub effective_provider: String,
pub effective_provider_kind: String,
pub fallback_used: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub fallback_from: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub results: Vec<RecallResult>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct AdvisoryIngestCommandReport {
pub command: &'static str,
pub ok: bool,
pub status: String,
pub path: String,
pub profile: String,
pub configured_provider: String,
pub effective_provider: String,
pub effective_provider_kind: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_status: Option<AdvisoryIngestSyncStatusView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_map: Option<AdvisoryIngestSourceMapView>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Copy)]
enum RecallCommandKind {
Search,
Describe,
Expand,
}
#[derive(Debug, Clone, Copy)]
enum AdvisoryIngestCommandKind {
SyncStatus,
SourceMap,
}
#[derive(Clone, Copy)]
struct RecallCommandRequest<'a> {
query: Option<&'a str>,
native_id: Option<&'a str>,
limit: Option<usize>,
}
struct RecallCommandExecution {
results: Vec<RecallResult>,
warnings: Vec<String>,
}
impl RecallCommandKind {
fn command_name(self) -> &'static str {
match self {
Self::Search => "memory-search",
Self::Describe => "memory-describe",
Self::Expand => "memory-expand",
}
}
fn empty_message(self) -> &'static str {
match self {
Self::Search => "memory recall search returned no results",
Self::Describe => "memory recall describe returned no results",
Self::Expand => "memory recall expand returned no results",
}
}
}
impl CommandReport for RecallCommandReport {
fn exit_code(&self) -> ExitCode {
if self.ok {
ExitCode::SUCCESS
} else {
ExitCode::FAILURE
}
}
fn render_text(&self) {
println!("{}: {}", self.command, self.status);
println!("path: {}", self.path);
println!("profile: {}", self.profile);
println!(
"effective provider: {} ({})",
self.effective_provider, self.effective_provider_kind
);
if let Some(configured_provider) = &self.configured_provider {
println!("configured provider: {configured_provider}");
}
if self.fallback_used {
if let Some(fallback_from) = &self.fallback_from {
println!("fallback from: {fallback_from}");
} else {
println!("fallback used: true");
}
}
if let Some(query) = &self.query {
println!("query: {query}");
}
if let Some(native_id) = &self.native_id {
println!("native id: {native_id}");
}
if let Some(limit) = self.limit {
println!("limit: {limit}");
}
if let Some(message) = &self.message {
println!("{message}");
}
for warning in &self.warnings {
println!("warning: {warning}");
}
for result in &self.results {
println!(
"- [{}] {} ({})",
result.scope, result.title, result.reference.native_id
);
if let Some(preview) = &result.preview {
println!(" {preview}");
}
}
}
}
impl AdvisoryIngestCommandKind {
fn command_name(self) -> &'static str {
match self {
Self::SyncStatus => "memory-sync-status",
Self::SourceMap => "memory-source-map",
}
}
fn operation_name(self) -> &'static str {
match self {
Self::SyncStatus => "sync-status",
Self::SourceMap => "source-map",
}
}
}
impl CommandReport for AdvisoryIngestCommandReport {
fn exit_code(&self) -> ExitCode {
if self.ok {
ExitCode::SUCCESS
} else {
ExitCode::FAILURE
}
}
fn render_text(&self) {
println!("{}: {}", self.command, self.status);
println!("path: {}", self.path);
println!("profile: {}", self.profile);
println!(
"effective provider: {} ({})",
self.effective_provider, self.effective_provider_kind
);
println!("configured provider: {}", self.configured_provider);
if let Some(sync_status) = &self.sync_status {
println!("sync status: {}", sync_status.status);
println!("{}", sync_status.message);
}
if let Some(source_map) = &self.source_map {
println!("source map: {}", source_map.status);
println!("{}", source_map.message);
if !source_map.sources.is_empty() {
println!("sources: {}", source_map.sources.len());
}
}
for warning in &self.warnings {
println!("warning: {warning}");
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallResult {
pub reference: RecallProviderReference,
pub scope: String,
pub title: String,
pub rank: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub score: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preview: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub detail: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provenance: Option<RecallProvenance>,
#[serde(skip_serializing_if = "Option::is_none")]
pub temporal: Option<RecallTemporalHints>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_class: Option<String>,
pub expansion: RecallExpansion,
#[serde(default, skip_deserializing, skip_serializing_if = "Option::is_none")]
pub content_trust: Option<ContentTrust>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallProviderReference {
pub provider: String,
pub kind: String,
pub native_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub native_scope: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub uri: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallProvenance {
#[serde(skip_serializing_if = "Option::is_none")]
pub source_ref: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub surface_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallTemporalHints {
#[serde(skip_serializing_if = "Option::is_none")]
pub observed_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub updated_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub valid_at: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecallExpansion {
pub available: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RecallProviderRequest {
schema_version: u32,
operation: String,
profile: String,
repo_root: PathBuf,
locality_id: String,
timeout_s: u64,
scopes: Vec<String>,
inputs: BTreeMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
native_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RecallProviderResponse {
schema_version: u32,
ok: bool,
provider: String,
#[serde(default)]
error: Option<String>,
#[serde(default)]
health: Option<ProviderHealthPayload>,
#[serde(default)]
result: Option<RecallResult>,
#[serde(default)]
results: Option<Vec<RecallResult>>,
#[serde(default)]
sync_status: Option<AdvisoryIngestSyncStatusView>,
#[serde(default)]
source_map: Option<AdvisoryIngestSourceMapView>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProviderHealthPayload {
status: String,
#[serde(default)]
message: Option<String>,
#[serde(default)]
checked_at_epoch_s: Option<u64>,
}
#[derive(Debug, Clone, Copy)]
struct OperationArgs<'a> {
operation: &'a str,
query: Option<&'a str>,
native_id: Option<&'a str>,
limit: Option<usize>,
}
#[derive(Debug)]
struct CommandSessionChild {
provider_name: String,
timeout_s: u64,
child: Child,
stdin: Option<BufWriter<ChildStdin>>,
responses: mpsc::Receiver<std::io::Result<String>>,
stderr_buffer: Arc<Mutex<String>>,
stdout_handle: Option<thread::JoinHandle<()>>,
stderr_handle: Option<thread::JoinHandle<()>>,
}
#[derive(Debug)]
enum CommandSessionRequestError {
Write(std::io::Error),
Read(std::io::Error),
Protocol(anyhow::Error),
Timeout,
Closed,
}
impl CommandSessionChild {
fn spawn(provider: &ResolvedRecallProvider, config: &ExternalCommandConfig) -> Result<Self> {
use std::process::{Command, Stdio};
let binary = config
.command
.first()
.ok_or_else(|| anyhow!("recall provider `{}` has an empty command", provider.name))?;
let resolved_binary = find_in_path(binary).ok_or_else(|| {
anyhow!(
"external recall provider `{}`: binary `{binary}` not found on PATH",
provider.name
)
})?;
let mut child = Command::new(&resolved_binary)
.args(&config.command[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| {
format!(
"external recall provider `{}`: failed to spawn `{}`",
provider.name,
resolved_binary.display()
)
})?;
let stdin = Some(BufWriter::new(
child.stdin.take().expect("stdin should be piped"),
));
let stdout = child.stdout.take().expect("stdout should be piped");
let stderr = child.stderr.take().expect("stderr should be piped");
let (responses_tx, responses) = mpsc::channel();
let stderr_buffer = Arc::new(Mutex::new(String::new()));
let stdout_handle = Some(spawn_stdout_reader(stdout, responses_tx));
let stderr_handle = Some(spawn_stderr_reader(stderr, stderr_buffer.clone()));
Ok(Self {
provider_name: provider.name.clone(),
timeout_s: config.timeout_s,
child,
stdin,
responses,
stderr_buffer,
stdout_handle,
stderr_handle,
})
}
fn request(
&mut self,
request: &RecallProviderRequest,
) -> std::result::Result<RecallProviderResponse, CommandSessionRequestError> {
let json = serde_json::to_vec(request)
.with_context(|| {
format!(
"failed to serialize request for provider `{}`",
self.provider_name
)
})
.map_err(CommandSessionRequestError::Protocol)?;
self.stdin
.as_mut()
.expect("command-session stdin")
.write_all(&json)
.map_err(CommandSessionRequestError::Write)?;
self.stdin
.as_mut()
.expect("command-session stdin")
.write_all(b"\n")
.map_err(CommandSessionRequestError::Write)?;
self.stdin
.as_mut()
.expect("command-session stdin")
.flush()
.map_err(CommandSessionRequestError::Write)?;
match self
.responses
.recv_timeout(Duration::from_secs(self.timeout_s))
{
Ok(Ok(line)) => parse_and_validate_response(&self.provider_name, line.trim_end())
.map_err(CommandSessionRequestError::Protocol),
Ok(Err(error)) => Err(CommandSessionRequestError::Read(error)),
Err(mpsc::RecvTimeoutError::Timeout) => {
self.force_kill();
Err(CommandSessionRequestError::Timeout)
}
Err(mpsc::RecvTimeoutError::Disconnected) => Err(CommandSessionRequestError::Closed),
}
}
fn wait_for_exit(&mut self, deadline: Instant) {
loop {
match self.child.try_wait() {
Ok(Some(_)) => break,
Ok(None) if Instant::now() >= deadline => {
let _ = self.child.kill();
let _ = self.child.wait();
break;
}
Ok(None) => thread::sleep(Duration::from_millis(10)),
Err(_) => break,
}
}
}
fn force_kill(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
fn close_stdin(&mut self) {
if let Some(mut stdin) = self.stdin.take() {
let _ = stdin.flush();
}
}
fn stderr_snippet(&self) -> String {
self.stderr_buffer
.lock()
.map(|buffer| buffer.clone())
.unwrap_or_default()
}
}
impl Drop for CommandSessionChild {
fn drop(&mut self) {
self.close_stdin();
self.wait_for_exit(Instant::now() + Duration::from_secs(2));
if let Some(handle) = self.stdout_handle.take() {
let _ = handle.join();
}
if let Some(handle) = self.stderr_handle.take() {
let _ = handle.join();
}
}
}
fn command_session_error(
provider_name: &str,
timeout_s: u64,
error: &CommandSessionRequestError,
session: Option<&CommandSessionChild>,
) -> anyhow::Error {
let stderr = session
.map(CommandSessionChild::stderr_snippet)
.filter(|value| !value.trim().is_empty());
match error {
CommandSessionRequestError::Write(source) => match stderr {
Some(stderr) => anyhow!(
"external recall provider `{provider_name}` command-session write failed: {source}; stderr: {}",
stderr.trim()
),
None => anyhow!(
"external recall provider `{provider_name}` command-session write failed: {source}"
),
},
CommandSessionRequestError::Read(source) => match stderr {
Some(stderr) => anyhow!(
"external recall provider `{provider_name}` command-session read failed: {source}; stderr: {}",
stderr.trim()
),
None => anyhow!(
"external recall provider `{provider_name}` command-session read failed: {source}"
),
},
CommandSessionRequestError::Protocol(source) => match stderr {
Some(stderr) => anyhow!(
"external recall provider `{provider_name}` protocol error: {source}; stderr: {}",
stderr.trim()
),
None => anyhow!("external recall provider `{provider_name}` protocol error: {source}"),
},
CommandSessionRequestError::Timeout => match stderr {
Some(stderr) => anyhow!(
"external recall provider `{provider_name}` timed out after {timeout_s}s; stderr: {}",
stderr.trim()
),
None => anyhow!(
"external recall provider `{provider_name}` timed out after {timeout_s}s"
),
},
CommandSessionRequestError::Closed => match stderr {
Some(stderr) => anyhow!(
"external recall provider `{provider_name}` command-session closed before responding; stderr: {}",
stderr.trim()
),
None => anyhow!(
"external recall provider `{provider_name}` command-session closed before responding"
),
},
}
}
fn spawn_stdout_reader(
stdout: ChildStdout,
sender: mpsc::Sender<std::io::Result<String>>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut reader = BufReader::new(stdout);
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
if sender.send(Ok(line)).is_err() {
break;
}
}
Err(error) => {
let _ = sender.send(Err(error));
break;
}
}
}
})
}
fn spawn_stderr_reader(stderr: ChildStderr, buffer: Arc<Mutex<String>>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut reader = BufReader::new(stderr);
let mut chunk = [0_u8; 512];
loop {
match reader.read(&mut chunk) {
Ok(0) => break,
Ok(read) => {
if let Ok(mut stderr_buffer) = buffer.lock() {
let remaining = 1024usize.saturating_sub(stderr_buffer.len());
if remaining == 0 {
continue;
}
let text = String::from_utf8_lossy(&chunk[..read]);
stderr_buffer.push_str(&text.chars().take(remaining).collect::<String>());
} else {
break;
}
}
Err(_) => break,
}
}
})
}
#[derive(Debug)]
pub(crate) struct ResolvedRecallAdapter {
repo_root: PathBuf,
layout: StateLayout,
locality_id: String,
resolved: ResolvedRecallProvider,
command_session: Option<CommandSessionChild>,
}
impl ResolvedRecallAdapter {
fn new(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
resolved: ResolvedRecallProvider,
) -> Self {
Self {
repo_root: repo_root.to_path_buf(),
layout: layout.clone(),
locality_id: locality_id.to_owned(),
resolved,
command_session: None,
}
}
pub(crate) fn is_builtin_markdown(&self) -> bool {
self.resolved.kind == ProviderKind::AuthoredMarkdown
}
fn build_request(&self, operation: OperationArgs<'_>) -> Result<RecallProviderRequest> {
let config = match &self.resolved.backend {
ProviderBackend::ExternalCommand(config) => config,
ProviderBackend::AuthoredMarkdown => {
bail!(
"internal error: authored markdown provider operation `{}` must be dispatched through the builtin path",
operation.operation
)
}
};
Ok(RecallProviderRequest {
schema_version: 1,
operation: operation.operation.to_owned(),
profile: self.layout.profile().as_str().to_owned(),
repo_root: self.repo_root.clone(),
locality_id: self.locality_id.clone(),
timeout_s: config.timeout_s,
scopes: self.resolved.allowed_scopes.clone(),
inputs: self.resolved.inputs.clone(),
query: operation.query.map(str::to_owned),
native_id: operation.native_id.map(str::to_owned),
limit: operation.limit,
})
}
fn execute_external_operation(
&mut self,
operation: OperationArgs<'_>,
) -> Result<RecallProviderResponse> {
let config = match &self.resolved.backend {
ProviderBackend::ExternalCommand(config) => config.clone(),
ProviderBackend::AuthoredMarkdown => {
bail!(
"internal error: authored markdown provider operation `{}` must be dispatched through the builtin path",
operation.operation
)
}
};
let request = self.build_request(operation)?;
match config.mode {
ExternalCommandMode::OneShot => execute_external(&self.resolved, &config, &request),
ExternalCommandMode::CommandSession => {
if self.command_session.is_none() {
self.command_session =
Some(CommandSessionChild::spawn(&self.resolved, &config)?);
}
match self
.command_session
.as_mut()
.expect("command session")
.request(&request)
{
Ok(response) => Ok(response),
Err(CommandSessionRequestError::Write(_))
| Err(CommandSessionRequestError::Read(_))
| Err(CommandSessionRequestError::Closed) => {
self.command_session = None;
self.command_session =
Some(CommandSessionChild::spawn(&self.resolved, &config)?);
let result = self
.command_session
.as_mut()
.expect("respawned command session")
.request(&request);
match result {
Ok(response) => Ok(response),
Err(error) => {
let failure = command_session_error(
&self.resolved.name,
config.timeout_s,
&error,
self.command_session.as_ref(),
);
self.command_session = None;
Err(failure)
}
}
}
Err(error) => {
let failure = command_session_error(
&self.resolved.name,
config.timeout_s,
&error,
self.command_session.as_ref(),
);
self.command_session = None;
Err(failure)
}
}
}
}
}
fn search_results(&mut self, query: &str, limit: usize) -> Result<Vec<RecallResult>> {
match &self.resolved.backend {
ProviderBackend::AuthoredMarkdown => search_results(
&self.resolved,
&self.repo_root,
&self.layout,
&self.locality_id,
query,
limit,
),
ProviderBackend::ExternalCommand(_) => {
let mut results = self
.execute_external_operation(OperationArgs {
operation: "search",
query: Some(query),
native_id: None,
limit: Some(limit),
})?
.results
.unwrap_or_default();
apply_external_content_trust(&mut results);
results.truncate(limit);
Ok(results)
}
}
}
fn describe_results(&mut self, native_id: &str) -> Result<Vec<RecallResult>> {
match &self.resolved.backend {
ProviderBackend::AuthoredMarkdown => describe_result(
&self.resolved,
&self.repo_root,
&self.layout,
&self.locality_id,
native_id,
),
ProviderBackend::ExternalCommand(_) => {
let response = self.execute_external_operation(OperationArgs {
operation: "describe",
query: None,
native_id: Some(native_id),
limit: Some(1),
})?;
let mut results = response
.result
.into_iter()
.chain(response.results.unwrap_or_default())
.collect::<Vec<_>>();
apply_external_content_trust(&mut results);
Ok(results)
}
}
}
fn expand_results(&mut self, native_id: &str, limit: usize) -> Result<Vec<RecallResult>> {
match &self.resolved.backend {
ProviderBackend::AuthoredMarkdown => expand_result(
&self.resolved,
&self.repo_root,
&self.layout,
&self.locality_id,
native_id,
limit,
),
ProviderBackend::ExternalCommand(_) => {
let mut results = self
.execute_external_operation(OperationArgs {
operation: "expand",
query: None,
native_id: Some(native_id),
limit: Some(limit),
})?
.results
.unwrap_or_default();
apply_external_content_trust(&mut results);
Ok(results)
}
}
}
}
impl ResolvedIngestProvider {
fn as_transport_provider(&self) -> ResolvedRecallProvider {
ResolvedRecallProvider {
name: self.name.clone(),
kind: self.kind,
source: self.source,
capabilities: RecallCapabilities::default(),
allowed_scopes: self.allowed_scopes.clone(),
inputs: self.inputs.clone(),
backend: self.backend.clone(),
}
}
}
impl DetailedRecallProvider for ResolvedRecallAdapter {
fn descriptor(&self) -> DetailedProviderDescriptor {
DetailedProviderDescriptor {
name: self.resolved.name.clone(),
kind: self.resolved.kind.as_str().to_owned(),
}
}
fn capabilities(&self) -> DetailedRecallProviderCapabilities {
DetailedRecallProviderCapabilities::from_declared(&self.resolved.capabilities.names())
}
fn health(&mut self) -> Result<DetailedRecallHealthView> {
match &self.resolved.backend {
ProviderBackend::AuthoredMarkdown => Ok(DetailedRecallHealthView {
status: "ok".to_owned(),
message: Some(
"authored markdown recall is available from local runtime memory".to_owned(),
),
}),
ProviderBackend::ExternalCommand(_) => {
let response = self.execute_external_operation(OperationArgs {
operation: "health",
query: None,
native_id: None,
limit: None,
})?;
let health = response.health.ok_or_else(|| {
anyhow!(
"recall provider `{}` returned no `health` payload",
self.resolved.name
)
})?;
Ok(DetailedRecallHealthView {
status: health.status,
message: Some(health.message.unwrap_or_else(|| {
format!(
"recall provider `{}` responded to health",
self.resolved.name
)
})),
})
}
}
}
fn search(&mut self, query: &str) -> Result<Vec<DetailedRecallResult>> {
Ok(self
.search_results(query, 16)?
.into_iter()
.map(map_detailed_result)
.collect())
}
fn describe(
&mut self,
reference: &DetailedProviderReference,
) -> Result<Option<DetailedRecallResult>> {
Ok(self
.describe_results(&reference.provider_id)?
.into_iter()
.next()
.map(map_detailed_result))
}
fn expand(
&mut self,
reference: &DetailedProviderReference,
) -> Result<Vec<DetailedRecallResult>> {
Ok(self
.expand_results(&reference.provider_id, 1)?
.into_iter()
.map(map_detailed_result)
.collect())
}
}
fn map_detailed_result(result: RecallResult) -> DetailedRecallResult {
DetailedRecallResult {
provider_ref: DetailedProviderReference {
provider_name: result.reference.provider.clone(),
provider_kind: result.reference.kind.clone(),
provider_id: result.reference.native_id.clone(),
provider_scope: result.reference.native_scope.clone(),
},
ccd_scope: DetailedRecallScope::parse(&result.scope).unwrap_or(DetailedRecallScope::Repo),
title: result.title.clone(),
summary: result
.preview
.clone()
.or_else(|| result.detail.clone())
.unwrap_or_else(|| result.title.clone()),
rank: result.rank as u32,
preview: result.preview.clone(),
detail: result.detail.clone(),
can_expand: result.expansion.available,
memory_class_hints: result.memory_class.into_iter().collect(),
provenance: DetailedRecallProvenance {
source_ref: result
.provenance
.as_ref()
.and_then(|value| value.source_ref.clone()),
provider_uri: result.reference.uri.clone(),
surface_path: result
.provenance
.as_ref()
.and_then(|value| value.surface_path.clone()),
},
temporal: DetailedRecallTemporalHints {
observed_at: result
.temporal
.as_ref()
.and_then(|value| value.observed_at.clone()),
valid_at: result
.temporal
.as_ref()
.and_then(|value| value.valid_at.clone()),
updated_at: result
.temporal
.as_ref()
.and_then(|value| value.updated_at.clone()),
},
}
}
pub fn inspect_provider(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
check_health: bool,
) -> Result<MemoryProviderInspection> {
let effective = load_effective_config(layout, locality_id)?;
let mut issues = Vec::new();
let mut status = "loaded";
let authoritative_provider = if normalize_name(&effective.authoritative_provider)
== BUILTIN_PROVIDER_NAME
{
BUILTIN_PROVIDER_NAME.to_owned()
} else {
let profile_path = layout.profile_config_path().display().to_string();
issues.push(MemoryProviderIssue {
check: "memory_provider_config",
file: profile_path,
message: format!(
"authoritative memory provider `{}` is not supported in v1; authored markdown remains authoritative",
effective.authoritative_provider
),
});
status = "fallback";
BUILTIN_PROVIDER_NAME.to_owned()
};
let mut fallback_from = None;
let mut resolved = match resolve_recall_provider(&effective, layout, locality_id) {
Ok(provider) => provider,
Err(issue) => {
fallback_from = effective.configured_recall_provider.clone();
issues.push(issue);
status = "fallback";
builtin_markdown_provider(ProviderSource::BuiltinFallback)
}
};
let health = if check_health {
match inspect_health(&resolved, repo_root, layout, locality_id) {
Ok(view) => view,
Err(error) => {
if resolved.kind != ProviderKind::AuthoredMarkdown {
fallback_from = Some(resolved.name.clone());
issues.push(MemoryProviderIssue {
check: "memory_provider_health",
file: resolved.name.clone(),
message: format!(
"configured recall provider `{}` failed health inspection: {error:#}",
resolved.name
),
});
status = "fallback";
resolved = builtin_markdown_provider(ProviderSource::BuiltinFallback);
}
ProviderHealthView {
status: "error".to_owned(),
message: error.to_string(),
checked_at_epoch_s: Some(now_epoch_s()?),
}
}
}
} else {
ProviderHealthView {
status: "not_checked".to_owned(),
message: "health inspection skipped".to_owned(),
checked_at_epoch_s: None,
}
};
let effective_view = ProviderIdentityView {
name: resolved.name.clone(),
kind: resolved.kind.as_str().to_owned(),
source: resolved.source.as_str().to_owned(),
capabilities: resolved.capabilities.names(),
allowed_scopes: resolved
.allowed_scopes
.iter()
.map(|scope| canonical_scope_name(scope).to_owned())
.collect(),
};
let start_policy = resolve_start_recall_policy(
effective.start_recall_policy.as_deref(),
effective.configured_recall_provider.as_deref(),
&resolved,
&mut issues,
layout.profile_config_path().display().to_string(),
);
let recall_issue_count = issues.len();
let (advisory_ingest_provider, ingest_issues) =
inspect_advisory_ingest_provider(&effective, layout, locality_id);
issues.extend(ingest_issues);
if recall_issue_count > 0 && status == "loaded" {
status = "fallback";
}
Ok(MemoryProviderInspection {
view: MemoryProviderView {
status,
authoritative_provider,
configured_recall_provider: effective.configured_recall_provider.clone(),
effective_recall_provider: effective_view,
start_recall_policy: StartRecallPolicyView {
mode: start_policy.as_str().to_owned(),
search_limit: START_RECALL_LIMIT,
preview_chars: START_RECALL_PREVIEW_CHARS,
},
health,
issues: issues.iter().map(|issue| issue.message.clone()).collect(),
advisory_ingest_provider,
},
issues,
resolved,
fallback_from,
})
}
pub fn inspect_current_dir_for_describe() -> MemoryProviderView {
let cwd = match env::current_dir() {
Ok(cwd) => cwd,
Err(error) => {
return context_unavailable_view(format!(
"failed to resolve current directory: {error}"
))
}
};
let profile_name = match profile::resolve(None) {
Ok(profile) => profile,
Err(error) => {
return context_unavailable_view(format!("failed to resolve profile: {error:#}"))
}
};
let layout = match StateLayout::resolve(&cwd, profile_name) {
Ok(layout) => layout,
Err(error) => {
return context_unavailable_view(format!("failed to resolve CCD layout: {error:#}"))
}
};
let Some(marker) = repo_marker::load(&cwd).ok().flatten() else {
return context_unavailable_view(
"current directory is not linked to a CCD project; run `ccd attach --path .` first"
.to_owned(),
);
};
match inspect_provider(&cwd, &layout, &marker.locality_id, false) {
Ok(inspection) => inspection.view,
Err(error) => context_unavailable_view(format!(
"failed to inspect memory-provider context from the current directory: {error:#}"
)),
}
}
pub fn search(
repo_root: &Path,
explicit_profile: Option<&str>,
query: &str,
limit: Option<usize>,
) -> Result<RecallCommandReport> {
if query.trim().is_empty() {
bail!("memory search requires a non-empty query");
}
run_recall_command(
repo_root,
explicit_profile,
RecallCommandKind::Search,
Some(query),
None,
limit,
)
}
pub fn describe(
repo_root: &Path,
explicit_profile: Option<&str>,
native_id: &str,
) -> Result<RecallCommandReport> {
if native_id.trim().is_empty() {
bail!("memory describe requires a non-empty native id");
}
run_recall_command(
repo_root,
explicit_profile,
RecallCommandKind::Describe,
None,
Some(native_id),
None,
)
}
pub fn expand(
repo_root: &Path,
explicit_profile: Option<&str>,
native_id: &str,
limit: Option<usize>,
) -> Result<RecallCommandReport> {
if native_id.trim().is_empty() {
bail!("memory expand requires a non-empty native id");
}
run_recall_command(
repo_root,
explicit_profile,
RecallCommandKind::Expand,
None,
Some(native_id),
limit,
)
}
pub fn sync_status(
repo_root: &Path,
explicit_profile: Option<&str>,
) -> Result<AdvisoryIngestCommandReport> {
run_advisory_ingest_command(
repo_root,
explicit_profile,
AdvisoryIngestCommandKind::SyncStatus,
)
}
pub fn source_map(
repo_root: &Path,
explicit_profile: Option<&str>,
) -> Result<AdvisoryIngestCommandReport> {
run_advisory_ingest_command(
repo_root,
explicit_profile,
AdvisoryIngestCommandKind::SourceMap,
)
}
fn run_recall_command(
repo_root: &Path,
explicit_profile: Option<&str>,
kind: RecallCommandKind,
query: Option<&str>,
native_id: Option<&str>,
limit: Option<usize>,
) -> Result<RecallCommandReport> {
let validated_limit = validate_recall_limit(kind, limit)?;
let profile_name = profile::resolve(explicit_profile)?;
let layout = StateLayout::resolve(repo_root, profile_name)?;
let locality_id = ensure_project_linked(repo_root)?;
let inspection = inspect_provider(repo_root, &layout, &locality_id, false)?;
let configured_provider = inspection.view.configured_recall_provider.clone();
let mut warnings = inspection
.issues
.iter()
.map(|issue| issue.message.clone())
.collect::<Vec<_>>();
let mut fallback_used = inspection.fallback_from.is_some();
let mut fallback_from = inspection.fallback_from.clone();
let mut effective_provider = inspection.resolved.name.clone();
let mut effective_provider_kind = inspection.resolved.kind.as_str().to_owned();
let primary_result = execute_recall_command(
kind,
&inspection.resolved,
repo_root,
&layout,
&locality_id,
RecallCommandRequest {
query,
native_id,
limit: validated_limit,
},
);
let (status, results, message, ok) = match primary_result {
Ok(execution) => {
warnings.extend(execution.warnings);
(
if fallback_used {
"fallback"
} else if execution.results.is_empty() {
"empty"
} else {
"loaded"
},
execution.results,
None,
true,
)
}
Err(error) if inspection.resolved.kind != ProviderKind::AuthoredMarkdown => {
warnings.push(error.to_string());
fallback_used = true;
fallback_from.get_or_insert_with(|| inspection.resolved.name.clone());
let fallback = builtin_markdown_provider(ProviderSource::BuiltinFallback);
effective_provider = fallback.name.clone();
effective_provider_kind = fallback.kind.as_str().to_owned();
match execute_recall_command(
kind,
&fallback,
repo_root,
&layout,
&locality_id,
RecallCommandRequest {
query,
native_id,
limit: validated_limit,
},
) {
Ok(execution) => {
warnings.extend(execution.warnings);
("fallback", execution.results, None, true)
}
Err(fallback_error) => (
"error",
Vec::new(),
Some(format!(
"{error}; fallback provider also failed: {fallback_error}"
)),
false,
),
}
}
Err(error) => ("error", Vec::new(), Some(error.to_string()), false),
};
let message = if message.is_some() {
message
} else if results.is_empty() {
Some(kind.empty_message().to_owned())
} else {
None
};
Ok(RecallCommandReport {
command: kind.command_name(),
ok,
status,
path: repo_root.display().to_string(),
profile: layout.profile().as_str().to_owned(),
query: query.map(str::to_owned),
native_id: native_id.map(str::to_owned),
limit,
configured_provider,
effective_provider,
effective_provider_kind,
fallback_used,
fallback_from,
results,
message,
warnings,
})
}
fn run_advisory_ingest_command(
repo_root: &Path,
explicit_profile: Option<&str>,
kind: AdvisoryIngestCommandKind,
) -> Result<AdvisoryIngestCommandReport> {
let profile_name = profile::resolve(explicit_profile)?;
let layout = StateLayout::resolve(repo_root, profile_name)?;
let locality_id = ensure_project_linked(repo_root)?;
let effective = load_effective_config(&layout, &locality_id)?;
let resolved = resolve_ingest_provider(&effective, &layout, &locality_id)
.map_err(|issue| anyhow!("{}", issue.message))?;
let response =
execute_advisory_ingest_operation(repo_root, &layout, &locality_id, &resolved, kind)?;
let (status, sync_status, source_map) = match kind {
AdvisoryIngestCommandKind::SyncStatus => {
let sync_status = response.sync_status.ok_or_else(|| {
anyhow!(
"advisory ingest provider `{}` returned no `sync_status` payload",
resolved.name
)
})?;
(sync_status.status.clone(), Some(sync_status), None)
}
AdvisoryIngestCommandKind::SourceMap => {
let source_map = response.source_map.ok_or_else(|| {
anyhow!(
"advisory ingest provider `{}` returned no `source_map` payload",
resolved.name
)
})?;
(source_map.status.clone(), None, Some(source_map))
}
};
Ok(AdvisoryIngestCommandReport {
command: kind.command_name(),
ok: true,
status,
path: repo_root.display().to_string(),
profile: layout.profile().as_str().to_owned(),
configured_provider: effective
.configured_ingest_provider
.clone()
.unwrap_or_else(|| resolved.name.clone()),
effective_provider: resolved.name.clone(),
effective_provider_kind: resolved.kind.as_str().to_owned(),
sync_status,
source_map,
warnings: Vec::new(),
})
}
fn execute_advisory_ingest_operation(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
provider: &ResolvedIngestProvider,
kind: AdvisoryIngestCommandKind,
) -> Result<RecallProviderResponse> {
let mut adapter = ResolvedRecallAdapter::new(
repo_root,
layout,
locality_id,
provider.as_transport_provider(),
);
adapter.execute_external_operation(OperationArgs {
operation: kind.operation_name(),
query: None,
native_id: None,
limit: None,
})
}
fn validate_recall_limit(kind: RecallCommandKind, limit: Option<usize>) -> Result<Option<usize>> {
match (kind, limit) {
(RecallCommandKind::Search, Some(0)) => {
bail!("memory search requires a limit greater than 0")
}
(RecallCommandKind::Expand, Some(0)) => {
bail!("memory expand requires a limit greater than 0")
}
_ => Ok(limit),
}
}
fn bound_recall_results(
kind: RecallCommandKind,
mut results: Vec<RecallResult>,
limit: Option<usize>,
) -> RecallCommandExecution {
let mut warnings = Vec::new();
let max_results = match kind {
RecallCommandKind::Search => Some(limit.unwrap_or(DEFAULT_RECALL_SEARCH_LIMIT)),
RecallCommandKind::Describe => Some(1),
RecallCommandKind::Expand => Some(limit.unwrap_or(DEFAULT_RECALL_EXPAND_LIMIT)),
};
if let Some(max_results) = max_results.filter(|max| results.len() > *max) {
let warning = match kind {
RecallCommandKind::Search => format!(
"provider returned {} results for search with limit {}; truncating",
results.len(),
max_results
),
RecallCommandKind::Describe => format!(
"provider returned {} results for describe; truncating to 1",
results.len()
),
RecallCommandKind::Expand => format!(
"provider returned {} results for expand with limit {}; truncating",
results.len(),
max_results
),
};
warnings.push(warning);
results.truncate(max_results);
}
RecallCommandExecution { results, warnings }
}
fn execute_recall_command(
kind: RecallCommandKind,
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
request: RecallCommandRequest<'_>,
) -> Result<RecallCommandExecution> {
let results = match kind {
RecallCommandKind::Search => search_results(
provider,
repo_root,
layout,
locality_id,
request.query.expect("search query"),
request.limit.unwrap_or(DEFAULT_RECALL_SEARCH_LIMIT),
)?,
RecallCommandKind::Describe => describe_result(
provider,
repo_root,
layout,
locality_id,
request.native_id.expect("describe native id"),
)?,
RecallCommandKind::Expand => expand_result(
provider,
repo_root,
layout,
locality_id,
request.native_id.expect("expand native id"),
request.limit.unwrap_or(DEFAULT_RECALL_EXPAND_LIMIT),
)?,
};
Ok(bound_recall_results(kind, results, request.limit))
}
fn ensure_project_linked(repo_root: &Path) -> Result<String> {
let Some(marker) = repo_marker::load(repo_root)? else {
bail!(
"current checkout is not linked to a CCD project; run `ccd attach --path {}` or `ccd link --path {}` first",
repo_root.display(),
repo_root.display()
);
};
Ok(marker.locality_id)
}
pub fn collect_startup_recall(
inspection: &MemoryProviderInspection,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
handoff: &compiled::CompiledHandoffView,
) -> Result<StartupRecallView> {
let mode = inspection.view.start_recall_policy.mode.as_str();
if mode == StartRecallPolicy::Disabled.as_str() {
return Ok(StartupRecallView {
status: "disabled".to_owned(),
query: None,
provider: Some(inspection.resolved.name.clone()),
results: Vec::new(),
message: Some("startup provider recall is disabled".to_owned()),
});
}
if let Some(original) = &inspection.fallback_from {
return Ok(StartupRecallView {
status: "fallback".to_owned(),
query: None,
provider: Some(original.clone()),
results: Vec::new(),
message: Some(format!(
"configured recall provider `{original}` is unavailable; startup continues from local authored memory only"
)),
});
}
let Some(query) = derive_start_query(handoff) else {
return Ok(StartupRecallView {
status: "skipped".to_owned(),
query: None,
provider: Some(inspection.resolved.name.clone()),
results: Vec::new(),
message: Some(
"no continuity title or immediate action was available for startup recall"
.to_owned(),
),
});
};
let mut results = search_results(
&inspection.resolved,
repo_root,
layout,
locality_id,
&query,
START_RECALL_LIMIT,
)
.map_err(|error| anyhow!("startup provider recall failed: {error:#}"))?;
for result in &mut results {
if let Some(preview) = &result.preview {
result.preview = Some(truncate(preview, START_RECALL_PREVIEW_CHARS));
}
result.detail = None;
}
let status = if results.is_empty() {
"empty"
} else {
"loaded"
};
let message = if results.is_empty() {
Some("provider recall returned no additive startup context".to_owned())
} else {
None
};
Ok(StartupRecallView {
status: status.to_owned(),
query: Some(query),
provider: Some(inspection.resolved.name.clone()),
results,
message,
})
}
pub(crate) fn prepare_start_recall(
layout: &StateLayout,
repo_root: &Path,
locality_id: &str,
) -> PreparedStartRecall {
let mut fallback_resolved = builtin_markdown_provider(ProviderSource::BuiltinFallback);
fallback_resolved.name = "markdown-native".to_owned();
let fallback = ResolvedRecallAdapter::new(repo_root, layout, locality_id, fallback_resolved);
match load_effective_config(layout, locality_id).and_then(|effective| {
let start = load_start_memory_config(layout, locality_id)?;
Ok((effective, start))
}) {
Ok((effective, start)) => {
let budget = start.budget();
let provider_name = effective.configured_recall_provider.clone();
if !start.enabled_for(provider_name.is_some()) || provider_name.is_none() {
return PreparedStartRecall {
provider: None,
fallback,
budget,
config_error: None,
};
}
match resolve_recall_provider(&effective, layout, locality_id) {
Ok(resolved) => PreparedStartRecall {
provider: Some(ResolvedRecallAdapter::new(
repo_root,
layout,
locality_id,
resolved,
)),
fallback,
budget,
config_error: None,
},
Err(issue) => PreparedStartRecall {
provider: None,
fallback,
budget,
config_error: Some(issue.message),
},
}
}
Err(error) => PreparedStartRecall {
provider: None,
fallback,
budget: StartRecallBudget::default(),
config_error: Some(error.to_string()),
},
}
}
fn context_unavailable_view(message: String) -> MemoryProviderView {
MemoryProviderView {
status: "context_unavailable",
authoritative_provider: BUILTIN_PROVIDER_NAME.to_owned(),
configured_recall_provider: None,
effective_recall_provider: ProviderIdentityView {
name: BUILTIN_PROVIDER_NAME.to_owned(),
kind: ProviderKind::AuthoredMarkdown.as_str().to_owned(),
source: ProviderSource::BuiltinFallback.as_str().to_owned(),
capabilities: RecallCapabilities::all().names(),
allowed_scopes: VALID_SCOPES
.iter()
.map(|scope| (*scope).to_owned())
.collect(),
},
start_recall_policy: StartRecallPolicyView {
mode: StartRecallPolicy::Disabled.as_str().to_owned(),
search_limit: START_RECALL_LIMIT,
preview_chars: START_RECALL_PREVIEW_CHARS,
},
health: ProviderHealthView {
status: "not_checked".to_owned(),
message,
checked_at_epoch_s: None,
},
issues: Vec::new(),
advisory_ingest_provider: advisory_ingest_context_unavailable_view(),
}
}
fn advisory_ingest_context_unavailable_view() -> AdvisoryIngestProviderView {
AdvisoryIngestProviderView {
status: "context_unavailable",
configured_provider: None,
effective_provider: None,
sync_status: AdvisoryIngestSyncStatusView {
status: "context_unavailable".to_owned(),
message: "ingest provider context unavailable".to_owned(),
checked_at_epoch_s: None,
latest_successful_ingest_at_epoch_s: None,
lag_seconds: None,
scoped_source_coverage: Vec::new(),
},
source_map: AdvisoryIngestSourceMapView {
status: "context_unavailable".to_owned(),
message: "ingest provider context unavailable".to_owned(),
sources: Vec::new(),
},
issues: Vec::new(),
}
}
fn advisory_ingest_missing_view() -> AdvisoryIngestProviderView {
AdvisoryIngestProviderView {
status: "missing",
configured_provider: None,
effective_provider: None,
sync_status: AdvisoryIngestSyncStatusView {
status: "missing".to_owned(),
message: "no advisory ingest provider configured".to_owned(),
checked_at_epoch_s: None,
latest_successful_ingest_at_epoch_s: None,
lag_seconds: None,
scoped_source_coverage: Vec::new(),
},
source_map: AdvisoryIngestSourceMapView {
status: "missing".to_owned(),
message: "no advisory ingest provider configured".to_owned(),
sources: Vec::new(),
},
issues: Vec::new(),
}
}
fn inspect_advisory_ingest_provider(
effective: &EffectiveMemoryConfig,
layout: &StateLayout,
locality_id: &str,
) -> (AdvisoryIngestProviderView, Vec<MemoryProviderIssue>) {
let Some(configured_provider) = effective.configured_ingest_provider.clone() else {
return (advisory_ingest_missing_view(), Vec::new());
};
match resolve_ingest_provider(effective, layout, locality_id) {
Ok(resolved) => (
AdvisoryIngestProviderView {
status: "loaded",
configured_provider: Some(configured_provider),
effective_provider: Some(ProviderIdentityView {
name: resolved.name,
kind: resolved.kind.as_str().to_owned(),
source: resolved.source.as_str().to_owned(),
capabilities: resolved.capabilities,
allowed_scopes: resolved
.allowed_scopes
.iter()
.map(|scope| canonical_scope_name(scope).to_owned())
.collect(),
}),
sync_status: AdvisoryIngestSyncStatusView {
status: "not_checked".to_owned(),
message:
"advisory ingest provider configured; run `ccd memory sync-status` for a live probe"
.to_owned(),
checked_at_epoch_s: None,
latest_successful_ingest_at_epoch_s: None,
lag_seconds: None,
scoped_source_coverage: Vec::new(),
},
source_map: AdvisoryIngestSourceMapView {
status: "not_checked".to_owned(),
message:
"advisory ingest provider configured; run `ccd memory source-map` for a live export"
.to_owned(),
sources: Vec::new(),
},
issues: Vec::new(),
},
Vec::new(),
),
Err(issue) => {
let message = issue.message.clone();
(
AdvisoryIngestProviderView {
status: "error",
configured_provider: Some(configured_provider),
effective_provider: None,
sync_status: AdvisoryIngestSyncStatusView {
status: "error".to_owned(),
message: message.clone(),
checked_at_epoch_s: None,
latest_successful_ingest_at_epoch_s: None,
lag_seconds: None,
scoped_source_coverage: Vec::new(),
},
source_map: AdvisoryIngestSourceMapView {
status: "error".to_owned(),
message: message.clone(),
sources: Vec::new(),
},
issues: vec![message],
},
vec![issue],
)
}
}
}
fn load_effective_config(layout: &StateLayout, locality_id: &str) -> Result<EffectiveMemoryConfig> {
let profile = layout.load_profile_config()?;
let overlay = layout.load_repo_overlay_config(locality_id)?;
let merged = merge_memory_config(
&profile.memory,
overlay.as_ref().map(|config| &config.memory),
);
Ok(merged)
}
fn load_start_memory_config(layout: &StateLayout, locality_id: &str) -> Result<MemoryStartConfig> {
let profile = load_start_memory_config_file(&layout.profile_config_path())?;
let overlay = match layout.repo_overlay_config_path(locality_id) {
Ok(path) => load_start_memory_config_file(&path)?,
Err(_) => MemoryStartConfig::default(),
};
Ok(MemoryStartConfig::merge(&profile, &overlay))
}
fn load_start_memory_config_file(path: &Path) -> Result<MemoryStartConfig> {
if !path.exists() {
return Ok(MemoryStartConfig::default());
}
let contents =
fs::read_to_string(path).with_context(|| format!("failed to read {}", path.display()))?;
let parsed = toml::from_str::<MemoryStartConfigFile>(&contents)
.with_context(|| format!("failed to parse {}", path.display()))?;
Ok(parsed.memory.start)
}
fn merge_memory_config(
profile: &RawMemoryConfig,
overlay: Option<&RawMemoryConfig>,
) -> EffectiveMemoryConfig {
let mut providers = BTreeMap::new();
for (name, config) in &profile.providers {
providers.insert(
name.clone(),
ResolvedProviderEntry {
source: ProviderSource::ProfileConfig,
config: config.clone(),
},
);
}
if let Some(overlay) = overlay {
for (name, config) in &overlay.providers {
providers.insert(
name.clone(),
ResolvedProviderEntry {
source: ProviderSource::RepoOverlay,
config: config.clone(),
},
);
}
}
let mut ingest_providers = BTreeMap::new();
for (name, config) in &profile.ingest {
ingest_providers.insert(
name.clone(),
ResolvedProviderEntry {
source: ProviderSource::ProfileConfig,
config: config.clone(),
},
);
}
if let Some(overlay) = overlay {
for (name, config) in &overlay.ingest {
ingest_providers.insert(
name.clone(),
ResolvedProviderEntry {
source: ProviderSource::RepoOverlay,
config: config.clone(),
},
);
}
}
EffectiveMemoryConfig {
authoritative_provider: overlay
.and_then(|config| config.authoritative_provider.clone())
.or_else(|| profile.authoritative_provider.clone())
.unwrap_or_else(|| BUILTIN_PROVIDER_NAME.to_owned()),
configured_recall_provider: overlay
.and_then(|config| config.recall_provider.clone())
.or_else(|| profile.recall_provider.clone()),
configured_ingest_provider: overlay
.and_then(|config| config.ingest_provider.clone())
.or_else(|| profile.ingest_provider.clone()),
start_recall_policy: overlay
.and_then(|config| config.start_recall_policy.clone())
.or_else(|| profile.start_recall_policy.clone()),
providers,
ingest_providers,
}
}
fn resolve_recall_provider(
config: &EffectiveMemoryConfig,
layout: &StateLayout,
locality_id: &str,
) -> Result<ResolvedRecallProvider, MemoryProviderIssue> {
let Some(name) = config.configured_recall_provider.as_ref() else {
return Ok(builtin_markdown_provider(ProviderSource::BuiltinFallback));
};
if normalize_name(name) == BUILTIN_PROVIDER_NAME {
let source = config
.providers
.get(name)
.map(|entry| entry.source)
.unwrap_or(ProviderSource::BuiltinFallback);
return Ok(builtin_markdown_provider(source));
}
let Some(entry) = config.providers.get(name) else {
let file = layout
.repo_overlay_config_path(locality_id)
.map(|path| path.display().to_string())
.unwrap_or_else(|_| layout.profile_config_path().display().to_string());
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file,
message: format!(
"recall provider `{name}` is configured but no matching `[memory.providers.{name}]` entry exists"
),
});
};
let kind_raw = entry.config.kind.clone().unwrap_or_default();
let kind = match normalize_name(&kind_raw).as_str() {
"authored_markdown" => ProviderKind::AuthoredMarkdown,
"external_command" => ProviderKind::ExternalCommand,
_ => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_file_for(entry.source, layout, locality_id),
message: format!(
"recall provider `{name}` has unsupported kind `{kind_raw}`; valid kinds are `authored-markdown` and `external-command`"
),
})
}
};
match kind {
ProviderKind::AuthoredMarkdown => Ok(builtin_markdown_provider(entry.source)),
ProviderKind::ExternalCommand => {
resolve_external_provider(name, entry, layout, locality_id)
}
}
}
fn resolve_ingest_provider(
config: &EffectiveMemoryConfig,
layout: &StateLayout,
locality_id: &str,
) -> Result<ResolvedIngestProvider, MemoryProviderIssue> {
let Some(name) = config.configured_ingest_provider.as_ref() else {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: layout.profile_config_path().display().to_string(),
message: "no advisory ingest provider is configured".to_owned(),
});
};
let Some(entry) = config.ingest_providers.get(name) else {
let file = layout
.repo_overlay_config_path(locality_id)
.map(|path| path.display().to_string())
.unwrap_or_else(|_| layout.profile_config_path().display().to_string());
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file,
message: format!(
"advisory ingest provider `{name}` is configured but no matching `[memory.ingest.{name}]` entry exists"
),
});
};
let kind_raw = entry.config.kind.clone().unwrap_or_default();
match normalize_name(&kind_raw).as_str() {
"external_command" => resolve_external_ingest_provider(name, entry, layout, locality_id),
_ => Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_file_for(entry.source, layout, locality_id),
message: format!(
"advisory ingest provider `{name}` has unsupported kind `{kind_raw}`; valid kinds are `external-command`"
),
}),
}
}
fn resolve_external_provider(
name: &str,
entry: &ResolvedProviderEntry,
layout: &StateLayout,
locality_id: &str,
) -> std::result::Result<ResolvedRecallProvider, MemoryProviderIssue> {
let config_path = config_file_for(entry.source, layout, locality_id);
let mode = match entry.config.mode.as_deref().map(normalize_name) {
None => ExternalCommandMode::OneShot,
Some(value) if value == "command_session" => ExternalCommandMode::CommandSession,
Some(value) => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path.clone(),
message: format!(
"external recall provider `{name}` has unsupported `mode` value `{value}`; valid values are `command-session` or omitted"
),
})
}
};
let command = match entry.config.command.clone() {
Some(command) if !command.is_empty() => command,
_ => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"external recall provider `{name}` requires a non-empty `command` array"
),
})
}
};
let declared = entry.config.capabilities.clone().unwrap_or_default();
let capabilities = match RecallCapabilities::from_declared(&declared) {
Ok(capabilities) if capabilities.supports_all_recall_ops() => capabilities,
Ok(_) => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"external recall provider `{name}` must declare capabilities [{}]",
REQUIRED_EXTERNAL_CAPABILITIES.join(", ")
),
})
}
Err(error) => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"external recall provider `{name}` has invalid capabilities: {error:#}"
),
})
}
};
let allowed_scopes =
match normalize_scopes(entry.config.allowed_scopes.clone().unwrap_or_default()) {
Ok(scopes) if !scopes.is_empty() => scopes,
Ok(_) => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"external recall provider `{name}` must declare at least one allowed scope"
),
})
}
Err(error) => {
return Err(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"external recall provider `{name}` has invalid `allowed_scopes`: {error:#}"
),
})
}
};
Ok(ResolvedRecallProvider {
name: name.to_owned(),
kind: ProviderKind::ExternalCommand,
source: entry.source,
capabilities,
allowed_scopes,
inputs: entry.config.inputs.clone(),
backend: ProviderBackend::ExternalCommand(ExternalCommandConfig {
mode,
command,
timeout_s: entry.config.timeout_s.unwrap_or(DEFAULT_PROVIDER_TIMEOUT_S),
}),
})
}
fn resolve_external_ingest_provider(
name: &str,
entry: &ResolvedProviderEntry,
layout: &StateLayout,
locality_id: &str,
) -> std::result::Result<ResolvedIngestProvider, MemoryProviderIssue> {
let config_path = config_file_for(entry.source, layout, locality_id);
match entry.config.mode.as_deref().map(normalize_name) {
None => {}
Some(value) if value == "command_session" => {}
Some(value) => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path.clone(),
message: format!(
"advisory ingest provider `{name}` has unsupported `mode` value `{value}`; valid values are `command-session` or omitted"
),
})
}
}
match entry.config.command.as_ref() {
Some(command) if !command.is_empty() => {}
_ => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path.clone(),
message: format!(
"advisory ingest provider `{name}` requires a non-empty `command` array"
),
})
}
}
let capabilities = match normalize_ingest_capabilities(
entry.config.capabilities.clone().unwrap_or_default(),
) {
Ok(capabilities)
if REQUIRED_INGEST_REPORTING_CAPABILITIES
.iter()
.all(|required| capabilities.iter().any(|value| value == required)) =>
{
capabilities
}
Ok(_) => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path.clone(),
message: format!(
"advisory ingest provider `{name}` must declare capabilities [{}]",
REQUIRED_INGEST_REPORTING_CAPABILITIES.join(", ")
),
})
}
Err(error) => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path.clone(),
message: format!(
"advisory ingest provider `{name}` has invalid capabilities: {error:#}"
),
})
}
};
let allowed_scopes =
match normalize_scopes(entry.config.allowed_scopes.clone().unwrap_or_default()) {
Ok(scopes) if !scopes.is_empty() => scopes,
Ok(_) => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path,
message: format!(
"advisory ingest provider `{name}` must declare at least one allowed scope"
),
})
}
Err(error) => {
return Err(MemoryProviderIssue {
check: "memory_ingest_provider_config",
file: config_path,
message: format!(
"advisory ingest provider `{name}` has invalid `allowed_scopes`: {error:#}"
),
})
}
};
Ok(ResolvedIngestProvider {
name: name.to_owned(),
kind: ProviderKind::ExternalCommand,
source: entry.source,
capabilities,
allowed_scopes,
inputs: entry.config.inputs.clone(),
backend: ProviderBackend::ExternalCommand(ExternalCommandConfig {
mode: match entry.config.mode.as_deref().map(normalize_name) {
Some(value) if value == "command_session" => ExternalCommandMode::CommandSession,
_ => ExternalCommandMode::OneShot,
},
command: entry
.config
.command
.clone()
.expect("validated non-empty ingest command"),
timeout_s: entry.config.timeout_s.unwrap_or(DEFAULT_PROVIDER_TIMEOUT_S),
}),
})
}
fn builtin_markdown_provider(source: ProviderSource) -> ResolvedRecallProvider {
ResolvedRecallProvider {
name: BUILTIN_PROVIDER_NAME.to_owned(),
kind: ProviderKind::AuthoredMarkdown,
source,
capabilities: RecallCapabilities::all(),
allowed_scopes: VALID_SCOPES
.iter()
.map(|scope| (*scope).to_owned())
.collect(),
inputs: BTreeMap::new(),
backend: ProviderBackend::AuthoredMarkdown,
}
}
fn config_file_for(source: ProviderSource, layout: &StateLayout, locality_id: &str) -> String {
match source {
ProviderSource::RepoOverlay => layout
.repo_overlay_config_path(locality_id)
.map(|path| path.display().to_string())
.unwrap_or_else(|_| layout.profile_config_path().display().to_string()),
ProviderSource::ProfileConfig | ProviderSource::BuiltinFallback => {
layout.profile_config_path().display().to_string()
}
}
}
fn resolve_start_recall_policy(
raw: Option<&str>,
configured_recall_provider: Option<&str>,
resolved: &ResolvedRecallProvider,
issues: &mut Vec<MemoryProviderIssue>,
config_path: String,
) -> StartRecallPolicy {
match raw.map(normalize_name) {
Some(value) if value == "auto" => StartRecallPolicy::Auto,
Some(value) if value == "disabled" => StartRecallPolicy::Disabled,
Some(value) => {
issues.push(MemoryProviderIssue {
check: "memory_provider_config",
file: config_path,
message: format!(
"unsupported `memory.start_recall_policy` value `{value}`; valid values are `disabled` and `auto`"
),
});
StartRecallPolicy::Disabled
}
None if configured_recall_provider.is_some()
&& resolved.kind == ProviderKind::ExternalCommand =>
{
StartRecallPolicy::Auto
}
None => StartRecallPolicy::Disabled,
}
}
fn inspect_health(
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
) -> Result<ProviderHealthView> {
match &provider.backend {
ProviderBackend::AuthoredMarkdown => Ok(ProviderHealthView {
status: "ok".to_owned(),
message: "authored markdown recall is available from local runtime memory".to_owned(),
checked_at_epoch_s: Some(now_epoch_s()?),
}),
ProviderBackend::ExternalCommand(_) => {
let response = execute_operation(
provider,
repo_root,
layout,
locality_id,
OperationArgs {
operation: "health",
query: None,
native_id: None,
limit: None,
},
)?;
let health = response.health.ok_or_else(|| {
anyhow!(
"recall provider `{}` returned no `health` payload",
provider.name
)
})?;
Ok(ProviderHealthView {
status: health.status,
message: health.message.unwrap_or_else(|| {
format!("recall provider `{}` responded to health", provider.name)
}),
checked_at_epoch_s: health.checked_at_epoch_s.or(Some(now_epoch_s()?)),
})
}
}
}
fn search_results(
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
query: &str,
limit: usize,
) -> Result<Vec<RecallResult>> {
match &provider.backend {
ProviderBackend::AuthoredMarkdown => {
let loaded = runtime_state::load_runtime_state(repo_root, layout, locality_id)?;
Ok(search_authored_markdown(
&loaded,
&provider.allowed_scopes,
query,
limit,
))
}
ProviderBackend::ExternalCommand(_) => {
let mut results = execute_operation(
provider,
repo_root,
layout,
locality_id,
OperationArgs {
operation: "search",
query: Some(query),
native_id: None,
limit: Some(limit),
},
)?
.results
.unwrap_or_default();
apply_external_content_trust(&mut results);
results.truncate(limit);
Ok(results)
}
}
}
fn describe_result(
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
native_id: &str,
) -> Result<Vec<RecallResult>> {
match &provider.backend {
ProviderBackend::AuthoredMarkdown => {
let loaded = runtime_state::load_runtime_state(repo_root, layout, locality_id)?;
authored_markdown_describe(&loaded, native_id)
}
ProviderBackend::ExternalCommand(_) => {
let response = execute_operation(
provider,
repo_root,
layout,
locality_id,
OperationArgs {
operation: "describe",
query: None,
native_id: Some(native_id),
limit: Some(1),
},
)?;
let mut results = response
.result
.into_iter()
.chain(response.results.unwrap_or_default())
.collect::<Vec<_>>();
apply_external_content_trust(&mut results);
Ok(results)
}
}
}
fn expand_result(
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
native_id: &str,
limit: usize,
) -> Result<Vec<RecallResult>> {
match &provider.backend {
ProviderBackend::AuthoredMarkdown => {
let loaded = runtime_state::load_runtime_state(repo_root, layout, locality_id)?;
authored_markdown_expand(&loaded, native_id, limit)
}
ProviderBackend::ExternalCommand(_) => {
let mut results = execute_operation(
provider,
repo_root,
layout,
locality_id,
OperationArgs {
operation: "expand",
query: None,
native_id: Some(native_id),
limit: Some(limit),
},
)?
.results
.unwrap_or_default();
apply_external_content_trust(&mut results);
Ok(results)
}
}
}
fn execute_operation(
provider: &ResolvedRecallProvider,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
operation: OperationArgs<'_>,
) -> Result<RecallProviderResponse> {
match &provider.backend {
ProviderBackend::AuthoredMarkdown => bail!(
"internal error: authored markdown provider operation `{}` must be dispatched through the builtin path",
operation.operation
),
ProviderBackend::ExternalCommand(config) => execute_external(
provider,
config,
&RecallProviderRequest {
schema_version: 1,
operation: operation.operation.to_owned(),
profile: layout.profile().as_str().to_owned(),
repo_root: repo_root.to_path_buf(),
locality_id: locality_id.to_owned(),
timeout_s: config.timeout_s,
scopes: provider.allowed_scopes.clone(),
inputs: provider.inputs.clone(),
query: operation.query.map(str::to_owned),
native_id: operation.native_id.map(str::to_owned),
limit: operation.limit,
},
),
}
}
fn execute_external(
provider: &ResolvedRecallProvider,
config: &ExternalCommandConfig,
request: &RecallProviderRequest,
) -> Result<RecallProviderResponse> {
use std::process::{Command, Stdio};
let binary = config
.command
.first()
.ok_or_else(|| anyhow!("recall provider `{}` has an empty command", provider.name))?;
let resolved_binary = find_in_path(binary).ok_or_else(|| {
anyhow!(
"external recall provider `{}`: binary `{binary}` not found on PATH",
provider.name
)
})?;
let mut child = Command::new(&resolved_binary)
.args(&config.command[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| {
format!(
"external recall provider `{}`: failed to spawn `{}`",
provider.name,
resolved_binary.display()
)
})?;
{
let stdin = child.stdin.as_mut().expect("stdin should be piped");
let json = serde_json::to_vec(request)?;
stdin.write_all(&json)?;
stdin.write_all(b"\n")?;
}
drop(child.stdin.take());
let deadline = Instant::now() + Duration::from_secs(config.timeout_s);
loop {
match child.try_wait()? {
Some(exit_status) => {
let output = child.wait_with_output()?;
if !exit_status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
let truncated: String = stderr.chars().take(1024).collect();
bail!(
"external recall provider `{}` exited with {}: {}",
provider.name,
exit_status,
truncated.trim()
);
}
let stdout = String::from_utf8_lossy(&output.stdout);
return parse_and_validate_response(&provider.name, &stdout);
}
None => {
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
bail!(
"external recall provider `{}` timed out after {}s",
provider.name,
config.timeout_s
);
}
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
fn parse_and_validate_response(
provider_name: &str,
stdout: &str,
) -> Result<RecallProviderResponse> {
let response: RecallProviderResponse = serde_json::from_str(stdout).with_context(|| {
format!("external recall provider `{provider_name}` returned invalid JSON")
})?;
if response.schema_version != 1 {
bail!(
"external recall provider `{provider_name}` returned unsupported schema_version {}",
response.schema_version
);
}
if !response.ok {
let message = response
.error
.as_deref()
.unwrap_or("unknown error (missing `error` field)");
bail!("external recall provider `{provider_name}` returned error: {message}");
}
Ok(response)
}
fn find_in_path(binary: &str) -> Option<PathBuf> {
if binary.contains('/') {
let path = PathBuf::from(binary);
return is_executable_candidate(&path).then_some(path);
}
let path_var = env::var_os("PATH")?;
for dir in env::split_paths(&path_var) {
let candidate = dir.join(binary);
if is_executable_candidate(&candidate) {
return Some(candidate);
}
}
None
}
#[cfg(unix)]
fn is_executable_candidate(path: &Path) -> bool {
fs::metadata(path)
.map(|metadata| metadata.is_file() && (metadata.permissions().mode() & 0o111 != 0))
.unwrap_or(false)
}
#[cfg(not(unix))]
fn is_executable_candidate(path: &Path) -> bool {
path.is_file()
}
fn search_authored_markdown(
loaded: &runtime_state::LoadedRuntimeState,
scopes: &[String],
query: &str,
limit: usize,
) -> Vec<RecallResult> {
let mut ranked = Vec::new();
for (scope, index, entry, surface_path) in iter_runtime_entries(loaded, scopes) {
if let Some(score) = authored_markdown_score(query, entry) {
ranked.push((
score,
scope_priority(&scope),
build_authored_markdown_result(
scope,
index,
entry,
Some(surface_path),
true,
ranked.len() + 1,
),
));
}
}
ranked.sort_by(|left, right| {
right
.0
.partial_cmp(&left.0)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| left.1.cmp(&right.1))
.then_with(|| left.2.reference.native_id.cmp(&right.2.reference.native_id))
});
ranked
.into_iter()
.take(limit)
.enumerate()
.map(|(index, (_, _, mut result))| {
result.rank = index + 1;
result
})
.collect()
}
fn authored_markdown_describe(
loaded: &runtime_state::LoadedRuntimeState,
native_id: &str,
) -> Result<Vec<RecallResult>> {
let Some((scope, index, entry, surface_path)) = find_runtime_entry(loaded, native_id) else {
return Ok(Vec::new());
};
let mut result =
build_authored_markdown_result(scope, index, entry, Some(surface_path), true, 1);
result.detail = Some(entry.text.clone());
Ok(vec![result])
}
fn authored_markdown_expand(
loaded: &runtime_state::LoadedRuntimeState,
native_id: &str,
limit: usize,
) -> Result<Vec<RecallResult>> {
let Some((scope, index, entry, surface_path)) = find_runtime_entry(loaded, native_id) else {
return Ok(Vec::new());
};
let entries = scope_entries(loaded, &scope);
let mut results = Vec::new();
let start = index.saturating_sub(1);
let end = usize::min(entries.len(), index + 2);
for (offset, candidate) in entries[start..end].iter().enumerate() {
let actual_index = start + offset;
if actual_index == index {
continue;
}
let mut result = build_authored_markdown_result(
scope.to_owned(),
actual_index,
candidate,
Some(surface_path.clone()),
false,
results.len() + 1,
);
result.detail = Some(candidate.text.clone());
results.push(result);
if results.len() >= limit {
break;
}
}
if results.is_empty() {
let mut self_result =
build_authored_markdown_result(scope, index, entry, Some(surface_path), false, 1);
self_result.detail = Some(entry.text.clone());
results.push(self_result);
}
Ok(results)
}
fn iter_runtime_entries<'a>(
loaded: &'a runtime_state::LoadedRuntimeState,
scopes: &[String],
) -> Vec<(String, usize, &'a RuntimeMemoryEntry, String)> {
let mut entries = Vec::new();
for scope in scopes {
for (index, entry) in scope_entries(loaded, scope).iter().enumerate() {
let surface_path = scope_surface_path(loaded, scope);
entries.push((scope.clone(), index, entry, surface_path));
}
}
entries
}
fn scope_entries<'a>(
loaded: &'a runtime_state::LoadedRuntimeState,
scope: &str,
) -> &'a [RuntimeMemoryEntry] {
match scope {
"profile" => &loaded.state.memory.profile,
"repo" => &loaded.state.memory.locality,
"pod" => &loaded.state.memory.pod,
"branch" => &loaded.state.memory.branch,
"clone" => &loaded.state.memory.clone,
_ => &[],
}
}
fn scope_surface_path(loaded: &runtime_state::LoadedRuntimeState, scope: &str) -> String {
match scope {
"profile" => loaded.sources.profile_memory.path.display().to_string(),
"repo" => loaded.sources.locality_memory.path.display().to_string(),
"pod" => loaded.sources.pod_memory.path.display().to_string(),
"branch" => loaded.sources.branch_memory.path.display().to_string(),
"clone" => loaded.sources.clone_memory.path.display().to_string(),
_ => String::new(),
}
}
fn find_runtime_entry<'a>(
loaded: &'a runtime_state::LoadedRuntimeState,
native_id: &str,
) -> Option<(String, usize, &'a RuntimeMemoryEntry, String)> {
for scope in VALID_SCOPES {
for (index, entry) in scope_entries(loaded, scope).iter().enumerate() {
if authored_markdown_native_id(scope, index, entry) == native_id {
return Some((
(*scope).to_owned(),
index,
entry,
scope_surface_path(loaded, scope),
));
}
}
}
None
}
fn build_authored_markdown_result(
scope: String,
index: usize,
entry: &RuntimeMemoryEntry,
surface_path: Option<String>,
expandable: bool,
rank: usize,
) -> RecallResult {
let native_id = authored_markdown_native_id(&scope, index, entry);
let title = entry
.structured_id()
.map(|id| format!("{scope}:{id}"))
.unwrap_or_else(|| truncate(&entry.projection_text(), 64));
let memory_class = match &entry.origin {
RuntimeMemoryOrigin::Structured { entry_type, .. } => Some(entry_type.clone()),
RuntimeMemoryOrigin::Narrative => None,
};
RecallResult {
reference: RecallProviderReference {
provider: BUILTIN_PROVIDER_NAME.to_owned(),
kind: ProviderKind::AuthoredMarkdown.as_str().to_owned(),
native_id,
native_scope: Some(scope.clone()),
uri: None,
},
scope,
title,
rank,
score: None,
preview: Some(truncate(&entry.text, START_RECALL_PREVIEW_CHARS)),
detail: None,
provenance: Some(RecallProvenance {
source_ref: entry.source_ref.clone(),
surface_path,
}),
temporal: Some(RecallTemporalHints {
observed_at: entry.created_at.clone(),
updated_at: entry.created_at.clone(),
valid_at: entry.expires_at.clone(),
}),
memory_class,
expansion: RecallExpansion {
available: expandable,
},
content_trust: None,
}
}
fn authored_markdown_native_id(scope: &str, index: usize, entry: &RuntimeMemoryEntry) -> String {
entry
.structured_id()
.map(str::to_owned)
.unwrap_or_else(|| format!("{scope}:{index}"))
}
fn authored_markdown_score(query: &str, entry: &RuntimeMemoryEntry) -> Option<f64> {
let haystack = format!(
"{} {} {}",
entry.projection_text(),
entry.tags.join(" "),
entry.source_ref.as_deref().unwrap_or("")
)
.to_ascii_lowercase();
let query = query.trim().to_ascii_lowercase();
if query.is_empty() {
return None;
}
let mut score = if haystack.contains(&query) { 10.0 } else { 0.0 };
for token in tokenize(&query) {
if haystack.contains(&token) {
score += 1.0;
}
}
(score > 0.0).then_some(score)
}
fn scope_priority(scope: &str) -> usize {
match scope {
"clone" => 0,
"branch" => 1,
"repo" => 2,
"pod" => 3,
"profile" => 4,
_ => 5,
}
}
fn apply_external_content_trust(results: &mut [RecallResult]) {
for result in results {
result.content_trust = Some(ContentTrust::ExternalAdapterOutput);
}
}
fn derive_start_query(handoff: &compiled::CompiledHandoffView) -> Option<String> {
if !handoff.title.trim().is_empty() && handoff.title != "No active session" {
let cleaned = handoff
.title
.strip_prefix("Next Session:")
.unwrap_or(&handoff.title)
.trim();
if !cleaned.is_empty() {
return Some(cleaned.to_owned());
}
}
handoff
.immediate_actions
.iter()
.find(|item| !item.trim().is_empty())
.map(|item| truncate(item.trim(), 120))
}
fn normalize_name(value: &str) -> String {
value.trim().to_ascii_lowercase().replace('-', "_")
}
fn normalize_ingest_capabilities(values: Vec<String>) -> Result<Vec<String>> {
let mut seen = HashSet::new();
let mut normalized = Vec::new();
for value in values {
let capability = match normalize_name(&value).as_str() {
"ingest" => "ingest".to_owned(),
"sync_status" => "sync-status".to_owned(),
"source_map" => "source-map".to_owned(),
other => bail!("unsupported capability `{other}`"),
};
if seen.insert(capability.clone()) {
normalized.push(capability);
}
}
normalized.sort_unstable();
Ok(normalized)
}
fn normalize_scopes(scopes: Vec<String>) -> Result<Vec<String>> {
let mut seen = HashSet::new();
let mut normalized = Vec::new();
for scope in scopes {
let scope = normalize_name(&scope);
let scope = match scope.as_str() {
"locality" | "project" => "repo".to_owned(),
"work_stream" => "branch".to_owned(),
"workspace" => "clone".to_owned(),
other => other.to_owned(),
};
if !VALID_SCOPES.contains(&scope.as_str()) {
bail!("unsupported scope `{scope}`");
}
if seen.insert(scope.clone()) {
normalized.push(scope);
}
}
Ok(normalized)
}
fn truncate(value: &str, max_chars: usize) -> String {
let mut chars = value.chars();
let truncated: String = chars.by_ref().take(max_chars).collect();
if chars.next().is_some() {
format!("{truncated}...")
} else {
truncated
}
}
fn tokenize(query: &str) -> Vec<String> {
query
.split(|ch: char| !ch.is_ascii_alphanumeric())
.filter(|token| token.len() >= 3)
.map(|token| token.to_owned())
.collect()
}
fn now_epoch_s() -> Result<u64> {
Ok(SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|error| anyhow!("system clock is before the Unix epoch: {error}"))?
.as_secs())
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use serde_json::{json, Value as JsonValue};
use tempfile::tempdir;
use super::*;
use crate::memory::recall::harness::{
assert_provider_conforms, assert_provider_returns_empty_search,
};
use crate::memory::recall::ProviderReference as DetailedProviderReference;
use crate::profile;
use crate::state::runtime::{RuntimeLifecycle, RuntimeMemoryState, RuntimeState};
fn markdown_entry(text: &str) -> RuntimeMemoryEntry {
RuntimeMemoryEntry {
id: None,
text: text.to_owned(),
lifecycle: RuntimeLifecycle::Active,
origin: RuntimeMemoryOrigin::Narrative,
state: None,
created_at: Some("2026-03-31T00:00:00Z".to_owned()),
last_touched_session: None,
superseded_at: None,
decay_class: None,
expires_at: None,
tags: vec!["semantic".to_owned()],
source_ref: None,
supersedes: Vec::new(),
}
}
fn loaded_runtime_with_entries(
entries: Vec<RuntimeMemoryEntry>,
) -> runtime_state::LoadedRuntimeState {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("memory.md");
std::fs::write(&path, "# Memory\n").expect("write memory");
runtime_state::LoadedRuntimeState {
state: RuntimeState {
memory: RuntimeMemoryState {
clone: entries,
..RuntimeMemoryState::default()
},
..RuntimeState::default()
},
sources: runtime_state::RuntimeSourceSurfaces {
profile_memory: runtime_state::RuntimeTextSurface::missing("profile_memory", &path),
locality_memory: runtime_state::RuntimeTextSurface::missing(
"locality_memory",
&path,
),
pod_memory: runtime_state::RuntimeTextSurface::missing("pod_memory", &path),
branch_memory: runtime_state::RuntimeTextSurface::missing("branch_memory", &path),
clone_memory: runtime_state::RuntimeTextSurface::missing("clone_memory", &path),
execution_gates: runtime_state::RuntimeTextSurface::missing(
"execution_gates",
&path,
),
handoff: runtime_state::RuntimeTextSurface::missing("handoff", &path),
},
execution_gates: runtime_state::LoadedRuntimeExecutionGates {
source: runtime_state::RuntimeTextSurface::missing("execution_gates", &path),
view: crate::state::session_gates::ExecutionGatesView {
path: path.display().to_string(),
status: "missing",
revision: None,
total_count: 0,
unfinished_count: 0,
seeded_from: None,
attention_anchor: None,
gates: Vec::new(),
},
},
recovery: runtime_state::LoadedRuntimeRecoveryState {
state: runtime_state::RuntimeRecoveryState::default(),
sources: runtime_state::RuntimeRecoverySurfaces {
checkpoint: runtime_state::RuntimeRecoverySurface {
kind: "checkpoint",
path: path.clone(),
status: "missing",
},
working_buffer: runtime_state::RuntimeRecoverySurface {
kind: "working_buffer",
path,
status: "missing",
},
},
},
pod_identity_active: false,
}
}
fn external_fixture_provider_with(
script_name: &str,
mode: ExternalCommandMode,
inputs: BTreeMap<String, String>,
) -> ResolvedRecallAdapter {
let repo_root = tempdir().expect("tempdir");
std::process::Command::new("git")
.arg("init")
.arg(repo_root.path())
.output()
.expect("git init");
let layout =
StateLayout::resolve(repo_root.path(), profile::resolve(None).expect("profile"))
.expect("layout");
let script = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures")
.join(script_name)
.canonicalize()
.expect("fixture path");
let command = if script.extension().and_then(|ext| ext.to_str()) == Some("py") {
vec!["python3".to_owned(), script.display().to_string()]
} else {
vec![script.display().to_string()]
};
ResolvedRecallAdapter::new(
repo_root.path(),
&layout,
"ccdrepo_fixture",
ResolvedRecallProvider {
name: "fixture".to_owned(),
kind: ProviderKind::ExternalCommand,
source: ProviderSource::ProfileConfig,
capabilities: RecallCapabilities::all(),
allowed_scopes: vec!["repo".to_owned(), "branch".to_owned(), "clone".to_owned()],
inputs,
backend: ProviderBackend::ExternalCommand(ExternalCommandConfig {
mode,
command,
timeout_s: 2,
}),
},
)
}
fn external_fixture_provider(script_name: &str) -> ResolvedRecallAdapter {
external_fixture_provider_with(script_name, ExternalCommandMode::OneShot, BTreeMap::new())
}
struct GraphitiMockServer {
address: String,
shutdown_tx: mpsc::Sender<()>,
handle: Option<thread::JoinHandle<()>>,
}
impl GraphitiMockServer {
fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock Graphiti server");
listener
.set_nonblocking(true)
.expect("set nonblocking listener");
let address = format!("http://{}", listener.local_addr().expect("listener addr"));
let (shutdown_tx, shutdown_rx) = mpsc::channel();
let handle = thread::spawn(move || loop {
if shutdown_rx.try_recv().is_ok() {
break;
}
match listener.accept() {
Ok((mut stream, _)) => {
handle_graphiti_connection(&mut stream);
}
Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(error) => panic!("accept failed: {error}"),
}
});
Self {
address,
shutdown_tx,
handle: Some(handle),
}
}
}
impl Drop for GraphitiMockServer {
fn drop(&mut self) {
let _ = self.shutdown_tx.send(());
if let Some(handle) = self.handle.take() {
handle.join().expect("join mock Graphiti server");
}
}
}
fn read_http_request(stream: &mut TcpStream) -> (String, String, Vec<u8>) {
let mut buffer = Vec::new();
let mut chunk = [0_u8; 1024];
let mut header_end = None;
let mut body_len = 0_usize;
loop {
let read = stream.read(&mut chunk).expect("read request");
if read == 0 {
break;
}
buffer.extend_from_slice(&chunk[..read]);
if header_end.is_none() {
if let Some(pos) = find_header_end(&buffer) {
header_end = Some(pos);
let headers = String::from_utf8_lossy(&buffer[..pos]);
body_len = content_length(&headers);
}
}
if let Some(pos) = header_end {
if buffer.len() >= pos + body_len {
break;
}
}
}
let pos = header_end.expect("header end");
let headers = String::from_utf8(buffer[..pos].to_vec()).expect("header text");
let request_line = headers.lines().next().expect("request line").to_owned();
let target = request_line
.split_whitespace()
.nth(1)
.expect("request target")
.to_owned();
let body = buffer[pos..].to_vec();
(request_line, target, body)
}
fn find_header_end(buffer: &[u8]) -> Option<usize> {
buffer
.windows(4)
.position(|window| window == b"\r\n\r\n")
.map(|pos| pos + 4)
}
fn content_length(headers: &str) -> usize {
headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
value.trim().parse::<usize>().ok()
} else {
None
}
})
.unwrap_or(0)
}
fn query_param(target: &str, key: &str) -> Option<String> {
let query = target.split_once('?')?.1;
for pair in query.split('&') {
let (name, value) = pair.split_once('=')?;
if name == key {
return Some(value.to_owned());
}
}
None
}
fn json_response(stream: &mut TcpStream, status: &str, value: JsonValue) {
let body = serde_json::to_vec(&value).expect("response json");
write!(
stream,
"HTTP/1.1 {status}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
)
.expect("write response headers");
stream.write_all(&body).expect("write response body");
stream.flush().expect("flush response");
}
fn graphiti_fact(group_id: &str) -> JsonValue {
json!({
"uuid": format!("{group_id}-edge-1"),
"name": format!("{group_id} relationship"),
"fact": format!("Graphiti fact for {group_id} keeps temporal scope mapping explicit."),
"valid_at": "2026-03-01T10:00:00Z",
"invalid_at": "2026-04-02T00:00:00Z",
"created_at": "2026-03-31T09:00:00Z",
"expired_at": "2026-04-03T00:00:00Z"
})
}
fn handle_graphiti_connection(stream: &mut TcpStream) {
let (_request_line, target, body) = read_http_request(stream);
let path = target
.split_once('?')
.map(|(path, _)| path)
.unwrap_or(&target);
match path {
"/healthcheck" => {
json_response(stream, "200 OK", json!({"status": "healthy"}));
}
"/search" => {
let payload: JsonValue = serde_json::from_slice(&body).expect("search payload");
let query = payload["query"].as_str().unwrap_or_default();
let group_id = payload["group_ids"][0].as_str().unwrap_or("repo-graph");
let facts = if query == "no-match" {
Vec::new()
} else {
vec![graphiti_fact(group_id)]
};
json_response(stream, "200 OK", json!({ "facts": facts }));
}
_ if path.starts_with("/entity-edge/") => {
let group_id = query_param(&target, "group_id").expect("group_id query");
json_response(stream, "200 OK", graphiti_fact(&group_id));
}
_ => {
json_response(
stream,
"404 Not Found",
json!({"detail": format!("unexpected path: {target}")}),
);
}
}
}
fn graphiti_script_provider(server: &GraphitiMockServer) -> ResolvedRecallAdapter {
let repo_root = tempdir().expect("tempdir");
std::process::Command::new("git")
.arg("init")
.arg(repo_root.path())
.output()
.expect("git init");
let layout =
StateLayout::resolve(repo_root.path(), profile::resolve(None).expect("profile"))
.expect("layout");
let script = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("scripts/memory/graphiti_adapter.py")
.canonicalize()
.expect("graphiti adapter path");
let mut inputs = BTreeMap::new();
inputs.insert("base_url".to_owned(), server.address.clone());
inputs.insert("group_id_branch".to_owned(), "branch-graph".to_owned());
inputs.insert("group_id_repo".to_owned(), "repo-graph".to_owned());
ResolvedRecallAdapter::new(
repo_root.path(),
&layout,
"ccdrepo_fixture",
ResolvedRecallProvider {
name: "graphiti".to_owned(),
kind: ProviderKind::ExternalCommand,
source: ProviderSource::ProfileConfig,
capabilities: RecallCapabilities::all(),
allowed_scopes: vec!["branch".to_owned(), "repo".to_owned()],
inputs,
backend: ProviderBackend::ExternalCommand(ExternalCommandConfig {
mode: ExternalCommandMode::OneShot,
command: vec![script.display().to_string()],
timeout_s: 2,
}),
},
)
}
#[test]
fn builtin_markdown_search_matches_query() {
let loaded = loaded_runtime_with_entries(vec![
markdown_entry("Mem0 adapter keeps provider output additive."),
markdown_entry("Retry worker invariants stay local."),
]);
let results = search_authored_markdown(&loaded, &["clone".to_owned()], "Mem0 additive", 3);
assert_eq!(results.len(), 1);
assert_eq!(results[0].reference.provider, "markdown");
assert_eq!(results[0].scope, "clone");
}
#[test]
fn builtin_markdown_describe_uses_native_id() {
let entry = RuntimeMemoryEntry::from_structured_entry(
crate::memory::entries::StructuredMemoryEntry {
id: "mem_rule".to_owned(),
entry_type: "rule".to_owned(),
state: "active".to_owned(),
created_at: "2026-03-31T00:00:00Z".to_owned(),
last_touched_session: 1,
origin: "manual".to_owned(),
superseded_at: None,
decay_class: None,
expires_at: None,
tags: Vec::new(),
source_ref: None,
supersedes: Vec::new(),
content: "Always keep provider recall additive.".to_owned(),
},
);
let loaded = loaded_runtime_with_entries(vec![entry]);
let results = authored_markdown_describe(&loaded, "mem_rule").expect("describe");
assert_eq!(results[0].reference.native_id, "mem_rule");
assert!(results[0].detail.as_deref().unwrap().contains("additive"));
}
#[test]
fn merge_memory_config_prefers_overlay_scalars_and_provider_entries() {
let mut profile = RawMemoryConfig::default();
profile.recall_provider = Some("profile".to_owned());
profile.ingest_provider = Some("profile-ingest".to_owned());
profile.providers.insert(
"profile".to_owned(),
RawMemoryProviderConfig {
kind: Some("external-command".to_owned()),
mode: None,
command: Some(vec!["profile-adapter".to_owned()]),
timeout_s: Some(5),
capabilities: Some(
REQUIRED_EXTERNAL_CAPABILITIES
.iter()
.map(|value| (*value).to_owned())
.collect(),
),
allowed_scopes: Some(vec!["clone".to_owned()]),
inputs: BTreeMap::new(),
},
);
profile.ingest.insert(
"profile-ingest".to_owned(),
RawMemoryProviderConfig {
kind: Some("external-command".to_owned()),
mode: None,
command: Some(vec!["profile-ingest-adapter".to_owned()]),
timeout_s: Some(5),
capabilities: Some(vec!["sync-status".to_owned(), "source-map".to_owned()]),
allowed_scopes: Some(vec!["clone".to_owned()]),
inputs: BTreeMap::new(),
},
);
let mut overlay = RawMemoryConfig::default();
overlay.recall_provider = Some("overlay".to_owned());
overlay.ingest_provider = Some("overlay-ingest".to_owned());
overlay.providers.insert(
"overlay".to_owned(),
RawMemoryProviderConfig {
kind: Some("authored-markdown".to_owned()),
mode: None,
command: None,
timeout_s: None,
capabilities: None,
allowed_scopes: None,
inputs: BTreeMap::new(),
},
);
overlay.ingest.insert(
"overlay-ingest".to_owned(),
RawMemoryProviderConfig {
kind: Some("external-command".to_owned()),
mode: None,
command: Some(vec!["overlay-ingest-adapter".to_owned()]),
timeout_s: Some(5),
capabilities: Some(vec!["sync-status".to_owned(), "source-map".to_owned()]),
allowed_scopes: Some(vec!["repo".to_owned()]),
inputs: BTreeMap::new(),
},
);
let merged = merge_memory_config(&profile, Some(&overlay));
assert_eq!(
merged.configured_recall_provider.as_deref(),
Some("overlay")
);
assert_eq!(
merged.configured_ingest_provider.as_deref(),
Some("overlay-ingest")
);
assert!(merged.providers.contains_key("profile"));
assert!(merged.providers.contains_key("overlay"));
assert!(merged.ingest_providers.contains_key("profile-ingest"));
assert!(merged.ingest_providers.contains_key("overlay-ingest"));
}
#[test]
fn invalid_start_policy_fails_closed() {
let resolved = builtin_markdown_provider(ProviderSource::BuiltinFallback);
let mut issues = Vec::new();
let policy = resolve_start_recall_policy(
Some("surprise"),
Some("markdown"),
&resolved,
&mut issues,
"config.toml".to_owned(),
);
assert_eq!(policy, StartRecallPolicy::Disabled);
assert_eq!(issues.len(), 1);
}
#[test]
fn inspect_current_dir_without_linked_repo_is_context_unavailable() {
let dir = tempdir().expect("tempdir");
let old = env::current_dir().expect("cwd");
env::set_current_dir(dir.path()).expect("set cwd");
let view = inspect_current_dir_for_describe();
env::set_current_dir(old).expect("restore cwd");
assert_eq!(view.status, "context_unavailable");
}
#[test]
fn normalize_scopes_rejects_unknown_values() {
let error = normalize_scopes(vec!["weird".to_owned()]).expect_err("invalid scope");
assert!(error.to_string().contains("unsupported scope"));
}
#[test]
fn builtin_markdown_provider_reports_all_scopes() {
let provider = builtin_markdown_provider(ProviderSource::BuiltinFallback);
assert_eq!(
provider.allowed_scopes,
VALID_SCOPES
.iter()
.map(|s| (*s).to_owned())
.collect::<Vec<_>>()
);
}
#[test]
fn external_command_provider_conforms_with_shell_fixture() {
let mut provider = external_fixture_provider("test-memory-provider.sh");
assert_provider_conforms(&mut provider, "provider conformance")
.expect("provider should satisfy the conformance harness");
}
#[test]
fn external_command_provider_returns_empty_search_for_no_match() {
let mut provider = external_fixture_provider("test-memory-provider.sh");
assert_provider_returns_empty_search(&mut provider, "no-match")
.expect("fixture should return an empty search response");
}
#[test]
fn external_command_provider_maps_scope_provenance_and_temporal_fields() {
let mut provider = external_fixture_provider("test-memory-provider.sh");
let result = provider
.search("provider conformance")
.expect("search should succeed")
.into_iter()
.next()
.expect("fixture result");
assert_eq!(result.ccd_scope, crate::memory::recall::RecallScope::Branch);
assert_eq!(
result.provider_ref.provider_scope.as_deref(),
Some("branch")
);
assert_eq!(
result.provenance.provider_uri.as_deref(),
Some("https://example.com/branch:0")
);
assert_eq!(
result.provenance.surface_path.as_deref(),
Some("/tmp/fixture-memory-provider")
);
assert_eq!(
result.temporal.observed_at.as_deref(),
Some("2026-03-31T09:00:00Z")
);
assert_eq!(
result.temporal.updated_at.as_deref(),
Some("2026-03-31T09:15:00Z")
);
assert_eq!(
result.temporal.valid_at.as_deref(),
Some("2026-04-01T00:00:00Z")
);
}
#[test]
fn external_command_describe_prefers_first_result_from_results_list() {
let mut provider = external_fixture_provider("test-memory-provider.sh");
let result = provider
.describe(&DetailedProviderReference {
provider_name: "fixture".to_owned(),
provider_kind: "external-command".to_owned(),
provider_id: "multi:0".to_owned(),
provider_scope: Some("branch".to_owned()),
})
.expect("describe should succeed")
.expect("describe should return the first result");
assert_eq!(result.provider_ref.provider_id, "multi:0");
assert_eq!(result.title, "First describe item");
}
#[test]
fn command_session_provider_reuses_single_child_across_recall_ops() {
let log_dir = tempdir().expect("tempdir");
let log_path = log_dir.path().join("session.log");
let mut inputs = BTreeMap::new();
inputs.insert("state_file".to_owned(), log_path.display().to_string());
{
let mut provider = external_fixture_provider_with(
"test-memory-provider-session.py",
ExternalCommandMode::CommandSession,
inputs,
);
let searched = provider
.search("provider conformance")
.expect("search should succeed");
let described = provider
.describe(&searched[0].provider_ref)
.expect("describe should succeed")
.expect("describe result");
provider
.expand(&described.provider_ref)
.expect("expand should succeed");
}
let log = fs::read_to_string(&log_path).expect("session log");
let lines = log.lines().collect::<Vec<_>>();
assert_eq!(
lines
.iter()
.filter(|line| line.starts_with("spawn "))
.count(),
1
);
assert_eq!(
lines
.iter()
.filter(|line| line.starts_with("exit "))
.count(),
1
);
assert_eq!(
lines.iter().filter(|line| line.starts_with("op ")).count(),
3
);
}
#[test]
fn command_session_provider_respawns_after_timeout() {
let log_dir = tempdir().expect("tempdir");
let log_path = log_dir.path().join("session-timeout.log");
let mut inputs = BTreeMap::new();
inputs.insert("state_file".to_owned(), log_path.display().to_string());
inputs.insert("hang_once_on".to_owned(), "search".to_owned());
let mut provider = external_fixture_provider_with(
"test-memory-provider-session.py",
ExternalCommandMode::CommandSession,
inputs,
);
let first_error = provider
.search("provider conformance")
.expect_err("first search should time out");
assert!(first_error.to_string().contains("timed out"));
let second = provider
.search("provider conformance")
.expect("second search should respawn and succeed");
assert!(!second.is_empty());
drop(provider);
let log = fs::read_to_string(&log_path).expect("session log");
let lines = log.lines().collect::<Vec<_>>();
assert_eq!(
lines
.iter()
.filter(|line| line.starts_with("spawn "))
.count(),
2
);
assert_eq!(
lines
.iter()
.filter(|line| line.starts_with("op search "))
.count(),
2
);
}
#[test]
fn graphiti_adapter_conforms_with_mock_server() {
let server = GraphitiMockServer::start();
let mut provider = graphiti_script_provider(&server);
assert_provider_conforms(&mut provider, "provider conformance")
.expect("graphiti adapter should satisfy the conformance harness");
}
#[test]
fn graphiti_adapter_returns_empty_search_for_no_match() {
let server = GraphitiMockServer::start();
let mut provider = graphiti_script_provider(&server);
assert_provider_returns_empty_search(&mut provider, "no-match")
.expect("graphiti adapter should return an empty search response");
}
#[test]
fn graphiti_adapter_maps_group_scope_and_temporal_fields() {
let server = GraphitiMockServer::start();
let mut provider = graphiti_script_provider(&server);
let result = provider
.search("provider conformance")
.expect("search should succeed")
.into_iter()
.next()
.expect("graphiti result");
assert_eq!(result.ccd_scope, crate::memory::recall::RecallScope::Branch);
assert_eq!(
result.provider_ref.provider_scope.as_deref(),
Some("branch-graph")
);
assert_eq!(
result.provenance.source_ref.as_deref(),
Some("graphiti://groups/branch-graph/entity-edge/branch-graph-edge-1")
);
assert_eq!(
result.provenance.provider_uri.as_deref(),
Some(
format!(
"{}/entity-edge/branch-graph-edge-1?group_id=branch-graph",
server.address
)
.as_str()
)
);
assert_eq!(result.provenance.surface_path.as_deref(), Some("/search"));
assert_eq!(
result.temporal.observed_at.as_deref(),
Some("2026-03-31T09:00:00Z")
);
assert_eq!(
result.temporal.valid_at.as_deref(),
Some("2026-03-01T10:00:00Z")
);
assert_eq!(result.temporal.updated_at, None);
assert!(!result.can_expand);
let described = provider
.describe(&result.provider_ref)
.expect("describe should succeed")
.expect("describe result");
assert!(described
.detail
.as_deref()
.expect("describe detail")
.contains("invalid_at: 2026-04-02T00:00:00Z"));
assert!(described
.detail
.as_deref()
.expect("describe detail")
.contains("expired_at: 2026-04-03T00:00:00Z"));
assert_eq!(
described.provenance.surface_path.as_deref(),
Some("/entity-edge/branch-graph-edge-1")
);
}
}