use crate::product::agent::config::Config;
use crate::product::agent::features::Feature;
use crate::product::agent::path_utils;
use crate::product::agent::rollout::list::Cursor;
use crate::product::agent::rollout::list::ThreadSortKey;
use crate::product::agent::rollout::metadata;
use crate::product::agent::rollout::recorder::is_unsupported_rollout_schema_anyhow;
use crate::product::otel::OtelManager;
use crate::product::protocol::ThreadId;
use crate::product::protocol::protocol::RolloutItem;
use crate::product::protocol::protocol::SessionSource;
use crate::product::state::DB_METRIC_COMPARE_ERROR;
pub use crate::product::state::LogEntry;
use crate::product::state::MemoryStoreMode;
use crate::product::state::STATE_DB_FILENAME;
use crate::product::state::ThreadMetadataBuilder;
use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
use chrono::Utc;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::warn;
use uuid::Uuid;
pub type StateDbHandle = Arc<crate::product::state::StateRuntime>;
pub(crate) async fn init_if_enabled(
config: &Config,
otel: Option<&OtelManager>,
) -> Option<StateDbHandle> {
let state_path = config.lha_home.join(STATE_DB_FILENAME);
if !state_db_feature_enabled(config) {
return None;
}
let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
let memory_store_mode = if config.features.enabled(Feature::MemoryTool) {
MemoryStoreMode::Required
} else {
MemoryStoreMode::Disabled
};
let runtime = match crate::product::state::StateRuntime::init_with_memory_store(
config.lha_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
memory_store_mode,
)
.await
{
Ok(runtime) => runtime,
Err(err) => {
warn!(
"failed to initialize state runtime at {}: {err}",
config.lha_home.display()
);
if let Some(otel) = otel {
otel.counter("lha.db.init", 1, &[("status", "init_error")]);
}
return None;
}
};
if !existed {
let runtime_for_backfill = Arc::clone(&runtime);
let config_for_backfill = config.clone();
let otel_for_backfill = otel.cloned();
tokio::task::spawn(async move {
metadata::backfill_sessions(
runtime_for_backfill.as_ref(),
&config_for_backfill,
otel_for_backfill.as_ref(),
)
.await;
});
}
Some(runtime)
}
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
let state_path = config.lha_home.join(STATE_DB_FILENAME);
if !state_db_feature_enabled(config)
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
{
return None;
}
crate::product::state::StateRuntime::init_with_memory_store(
config.lha_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
MemoryStoreMode::Disabled,
)
.await
.ok()
}
fn state_db_feature_enabled(config: &Config) -> bool {
config.features.enabled(Feature::Sqlite)
|| config.features.enabled(Feature::Goals)
|| config.features.enabled(Feature::MemoryTool)
}
pub async fn open_if_present(lha_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
let db_path = lha_home.join(STATE_DB_FILENAME);
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
return None;
}
let runtime = crate::product::state::StateRuntime::init_with_memory_store(
lha_home.to_path_buf(),
default_provider.to_string(),
None,
MemoryStoreMode::Disabled,
)
.await
.ok()?;
Some(runtime)
}
fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<crate::product::state::Anchor> {
let cursor = cursor?;
let value = serde_json::to_value(cursor).ok()?;
let cursor_str = value.as_str()?;
let (ts_str, id_str) = cursor_str.split_once('|')?;
if id_str.contains('|') {
return None;
}
let id = Uuid::parse_str(id_str).ok()?;
let ts = if let Ok(naive) = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S") {
DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
} else if let Ok(dt) = DateTime::parse_from_rfc3339(ts_str) {
dt.with_timezone(&Utc)
} else {
return None;
}
.with_nanosecond(0)?;
Some(crate::product::state::Anchor { ts, id })
}
#[allow(clippy::too_many_arguments)]
pub async fn list_thread_ids_db(
context: Option<&crate::product::state::StateRuntime>,
lha_home: &Path,
page_size: usize,
cursor: Option<&Cursor>,
sort_key: ThreadSortKey,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
cwd_filter: Option<&Path>,
archived_only: bool,
stage: &str,
) -> Option<Vec<ThreadId>> {
let ctx = context?;
if ctx.lha_home() != lha_home {
warn!(
"state db lha_home mismatch: expected {}, got {}",
ctx.lha_home().display(),
lha_home.display()
);
}
let anchor = cursor_to_anchor(cursor);
let allowed_sources: Vec<String> = allowed_sources
.iter()
.map(|value| match serde_json::to_value(value) {
Ok(Value::String(s)) => s,
Ok(other) => other.to_string(),
Err(_) => String::new(),
})
.collect();
let model_providers = model_providers.map(<[String]>::to_vec);
let sort_key = match sort_key {
ThreadSortKey::CreatedAt => crate::product::state::SortKey::CreatedAt,
ThreadSortKey::UpdatedAt => crate::product::state::SortKey::UpdatedAt,
};
let result = if let Some(cwd_filter) = cwd_filter {
collect_thread_ids_with_cwd_filter(
ctx,
page_size,
anchor.as_ref(),
sort_key,
allowed_sources.as_slice(),
model_providers.as_deref(),
cwd_filter,
archived_only,
)
.await
} else {
ctx.list_thread_ids(
page_size,
anchor.as_ref(),
sort_key,
allowed_sources.as_slice(),
model_providers.as_deref(),
archived_only,
)
.await
};
match result {
Ok(ids) => Some(ids),
Err(err) => {
warn!("state db list_thread_ids failed during {stage}: {err}");
None
}
}
}
#[allow(clippy::too_many_arguments)]
async fn collect_thread_ids_with_cwd_filter(
context: &crate::product::state::StateRuntime,
page_size: usize,
anchor: Option<&crate::product::state::Anchor>,
sort_key: crate::product::state::SortKey,
allowed_sources: &[String],
model_providers: Option<&[String]>,
cwd_filter: &Path,
archived_only: bool,
) -> anyhow::Result<Vec<ThreadId>> {
let mut ids = Vec::with_capacity(page_size);
let mut next_anchor = anchor.cloned();
while ids.len() < page_size {
let page = context
.list_threads(
page_size,
next_anchor.as_ref(),
sort_key,
allowed_sources,
model_providers,
archived_only,
)
.await?;
if page.items.is_empty() {
break;
}
for item in page.items {
if paths_match(item.cwd.as_path(), cwd_filter) {
ids.push(item.id);
if ids.len() == page_size {
break;
}
}
}
let Some(anchor) = page.next_anchor else {
break;
};
next_anchor = Some(anchor);
}
Ok(ids)
}
fn paths_match(a: &Path, b: &Path) -> bool {
if let (Ok(canonical_a), Ok(canonical_b)) = (
path_utils::normalize_for_path_comparison(a),
path_utils::normalize_for_path_comparison(b),
) {
return canonical_a == canonical_b;
}
a == b
}
pub async fn find_rollout_path_by_id(
context: Option<&crate::product::state::StateRuntime>,
thread_id: ThreadId,
archived_only: Option<bool>,
stage: &str,
) -> Option<PathBuf> {
let ctx = context?;
ctx.find_rollout_path_by_id(thread_id, archived_only)
.await
.unwrap_or_else(|err| {
warn!("state db find_rollout_path_by_id failed during {stage}: {err}");
None
})
}
pub async fn reconcile_rollout(
context: Option<&crate::product::state::StateRuntime>,
rollout_path: &Path,
default_provider: &str,
builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
) {
let Some(ctx) = context else {
return;
};
if builder.is_some() || !items.is_empty() {
apply_rollout_items(
Some(ctx),
rollout_path,
default_provider,
builder,
items,
"reconcile_rollout",
)
.await;
return;
}
let outcome =
match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await {
Ok(outcome) => outcome,
Err(err) => {
if is_unsupported_rollout_schema_anyhow(&err) {
warn!(
"skipping unsupported legacy rollout {}",
rollout_path.display()
);
return;
}
warn!(
"state db reconcile_rollout extraction failed {}: {err}",
rollout_path.display()
);
return;
}
};
if let Err(err) = ctx.upsert_thread(&outcome.metadata).await {
warn!(
"state db reconcile_rollout upsert failed {}: {err}",
rollout_path.display()
);
}
}
pub async fn apply_rollout_items(
context: Option<&crate::product::state::StateRuntime>,
rollout_path: &Path,
_default_provider: &str,
builder: Option<&ThreadMetadataBuilder>,
items: &[RolloutItem],
stage: &str,
) {
let Some(ctx) = context else {
return;
};
let mut builder = match builder {
Some(builder) => builder.clone(),
None => match metadata::builder_from_items(items, rollout_path) {
Some(builder) => builder,
None => {
warn!(
"state db apply_rollout_items missing builder during {stage}: {}",
rollout_path.display()
);
record_discrepancy(stage, "missing_builder");
return;
}
},
};
builder.rollout_path = rollout_path.to_path_buf();
if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await {
warn!(
"state db apply_rollout_items failed during {stage} for {}: {err}",
rollout_path.display()
);
}
}
pub fn record_discrepancy(stage: &str, reason: &str) {
tracing::warn!("state db record_discrepancy: {stage}{reason}");
if let Some(metric) = crate::product::otel::metrics::global() {
let _ = metric.counter(
DB_METRIC_COMPARE_ERROR,
1,
&[("stage", stage), ("reason", reason)],
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::product::agent::rollout::list::parse_cursor;
use pretty_assertions::assert_eq;
#[tokio::test]
async fn init_if_enabled_without_memory_feature_ignores_corrupt_memory_db() {
let home = tempfile::tempdir().expect("tempdir");
tokio::fs::write(
home.path()
.join(crate::product::state::MEMORIES_DB_FILENAME),
"not sqlite",
)
.await
.expect("write corrupt memories db");
let mut config = crate::product::agent::config::test_config();
config.lha_home = home.path().to_path_buf();
config.features.enable(Feature::Goals);
config.features.disable(Feature::MemoryTool);
let runtime = init_if_enabled(&config, None)
.await
.expect("state runtime should initialize");
assert!(runtime.memories().is_none());
}
#[tokio::test]
async fn init_if_enabled_with_memory_feature_rejects_corrupt_memory_db() {
let home = tempfile::tempdir().expect("tempdir");
tokio::fs::write(
home.path()
.join(crate::product::state::MEMORIES_DB_FILENAME),
"not sqlite",
)
.await
.expect("write corrupt memories db");
let mut config = crate::product::agent::config::test_config();
config.lha_home = home.path().to_path_buf();
config.features.enable(Feature::MemoryTool);
assert!(init_if_enabled(&config, None).await.is_none());
}
#[test]
fn cursor_to_anchor_normalizes_timestamp_format() {
let uuid = Uuid::new_v4();
let ts_str = "2026-01-27T12-34-56";
let token = format!("{ts_str}|{uuid}");
let cursor = parse_cursor(token.as_str()).expect("cursor should parse");
let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse");
let naive =
NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S").expect("ts should parse");
let expected_ts = DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
.with_nanosecond(0)
.expect("nanosecond");
assert_eq!(anchor.id, uuid);
assert_eq!(anchor.ts, expected_ts);
}
}