lha 1.0.2

Long-Horizon Agent command-line package that installs the lha binary.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
use crate::product::agent::config::Config;
use crate::product::agent::features::Feature;
use crate::product::agent::path_utils;
use crate::product::agent::rollout::list::Cursor;
use crate::product::agent::rollout::list::ThreadSortKey;
use crate::product::agent::rollout::metadata;
use crate::product::agent::rollout::recorder::is_unsupported_rollout_schema_anyhow;
use crate::product::otel::OtelManager;
use crate::product::protocol::ThreadId;
use crate::product::protocol::protocol::RolloutItem;
use crate::product::protocol::protocol::SessionSource;
use crate::product::state::DB_METRIC_COMPARE_ERROR;
pub use crate::product::state::LogEntry;
use crate::product::state::MemoryStoreMode;
use crate::product::state::STATE_DB_FILENAME;
use crate::product::state::ThreadMetadataBuilder;
use chrono::DateTime;
use chrono::NaiveDateTime;
use chrono::Timelike;
use chrono::Utc;
use serde_json::Value;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::warn;
use uuid::Uuid;

/// Core-facing handle to the optional SQLite-backed state runtime.
pub type StateDbHandle = Arc<crate::product::state::StateRuntime>;

/// Initialize the state runtime when persistence-backed features are enabled. To only be used
/// inside `core`. The initialization should not be done anywhere else.
pub(crate) async fn init_if_enabled(
    config: &Config,
    otel: Option<&OtelManager>,
) -> Option<StateDbHandle> {
    let state_path = config.lha_home.join(STATE_DB_FILENAME);
    if !state_db_feature_enabled(config) {
        return None;
    }
    let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
    let memory_store_mode = if config.features.enabled(Feature::MemoryTool) {
        MemoryStoreMode::Required
    } else {
        MemoryStoreMode::Disabled
    };
    let runtime = match crate::product::state::StateRuntime::init_with_memory_store(
        config.lha_home.clone(),
        config.model_provider_id.clone(),
        otel.cloned(),
        memory_store_mode,
    )
    .await
    {
        Ok(runtime) => runtime,
        Err(err) => {
            warn!(
                "failed to initialize state runtime at {}: {err}",
                config.lha_home.display()
            );
            if let Some(otel) = otel {
                otel.counter("lha.db.init", 1, &[("status", "init_error")]);
            }
            return None;
        }
    };
    if !existed {
        let runtime_for_backfill = Arc::clone(&runtime);
        let config_for_backfill = config.clone();
        let otel_for_backfill = otel.cloned();
        tokio::task::spawn(async move {
            metadata::backfill_sessions(
                runtime_for_backfill.as_ref(),
                &config_for_backfill,
                otel_for_backfill.as_ref(),
            )
            .await;
        });
    }
    Some(runtime)
}

/// Get the DB if the feature is enabled and the DB exists.
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
    let state_path = config.lha_home.join(STATE_DB_FILENAME);
    if !state_db_feature_enabled(config)
        || !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
    {
        return None;
    }
    crate::product::state::StateRuntime::init_with_memory_store(
        config.lha_home.clone(),
        config.model_provider_id.clone(),
        otel.cloned(),
        MemoryStoreMode::Disabled,
    )
    .await
    .ok()
}

fn state_db_feature_enabled(config: &Config) -> bool {
    config.features.enabled(Feature::Sqlite)
        || config.features.enabled(Feature::Goals)
        || config.features.enabled(Feature::MemoryTool)
}

/// Open the state runtime when the SQLite file exists, without feature gating.
///
/// This is used for parity checks during the SQLite migration phase.
pub async fn open_if_present(lha_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
    let db_path = lha_home.join(STATE_DB_FILENAME);
    if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
        return None;
    }
    let runtime = crate::product::state::StateRuntime::init_with_memory_store(
        lha_home.to_path_buf(),
        default_provider.to_string(),
        None,
        MemoryStoreMode::Disabled,
    )
    .await
    .ok()?;
    Some(runtime)
}

fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<crate::product::state::Anchor> {
    let cursor = cursor?;
    let value = serde_json::to_value(cursor).ok()?;
    let cursor_str = value.as_str()?;
    let (ts_str, id_str) = cursor_str.split_once('|')?;
    if id_str.contains('|') {
        return None;
    }
    let id = Uuid::parse_str(id_str).ok()?;
    let ts = if let Ok(naive) = NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S") {
        DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
    } else if let Ok(dt) = DateTime::parse_from_rfc3339(ts_str) {
        dt.with_timezone(&Utc)
    } else {
        return None;
    }
    .with_nanosecond(0)?;
    Some(crate::product::state::Anchor { ts, id })
}

/// List thread ids from SQLite for parity checks without rollout scanning.
#[allow(clippy::too_many_arguments)]
pub async fn list_thread_ids_db(
    context: Option<&crate::product::state::StateRuntime>,
    lha_home: &Path,
    page_size: usize,
    cursor: Option<&Cursor>,
    sort_key: ThreadSortKey,
    allowed_sources: &[SessionSource],
    model_providers: Option<&[String]>,
    cwd_filter: Option<&Path>,
    archived_only: bool,
    stage: &str,
) -> Option<Vec<ThreadId>> {
    let ctx = context?;
    if ctx.lha_home() != lha_home {
        warn!(
            "state db lha_home mismatch: expected {}, got {}",
            ctx.lha_home().display(),
            lha_home.display()
        );
    }

    let anchor = cursor_to_anchor(cursor);
    let allowed_sources: Vec<String> = allowed_sources
        .iter()
        .map(|value| match serde_json::to_value(value) {
            Ok(Value::String(s)) => s,
            Ok(other) => other.to_string(),
            Err(_) => String::new(),
        })
        .collect();
    let model_providers = model_providers.map(<[String]>::to_vec);
    let sort_key = match sort_key {
        ThreadSortKey::CreatedAt => crate::product::state::SortKey::CreatedAt,
        ThreadSortKey::UpdatedAt => crate::product::state::SortKey::UpdatedAt,
    };
    let result = if let Some(cwd_filter) = cwd_filter {
        collect_thread_ids_with_cwd_filter(
            ctx,
            page_size,
            anchor.as_ref(),
            sort_key,
            allowed_sources.as_slice(),
            model_providers.as_deref(),
            cwd_filter,
            archived_only,
        )
        .await
    } else {
        ctx.list_thread_ids(
            page_size,
            anchor.as_ref(),
            sort_key,
            allowed_sources.as_slice(),
            model_providers.as_deref(),
            archived_only,
        )
        .await
    };
    match result {
        Ok(ids) => Some(ids),
        Err(err) => {
            warn!("state db list_thread_ids failed during {stage}: {err}");
            None
        }
    }
}

#[allow(clippy::too_many_arguments)]
async fn collect_thread_ids_with_cwd_filter(
    context: &crate::product::state::StateRuntime,
    page_size: usize,
    anchor: Option<&crate::product::state::Anchor>,
    sort_key: crate::product::state::SortKey,
    allowed_sources: &[String],
    model_providers: Option<&[String]>,
    cwd_filter: &Path,
    archived_only: bool,
) -> anyhow::Result<Vec<ThreadId>> {
    let mut ids = Vec::with_capacity(page_size);
    let mut next_anchor = anchor.cloned();

    while ids.len() < page_size {
        let page = context
            .list_threads(
                page_size,
                next_anchor.as_ref(),
                sort_key,
                allowed_sources,
                model_providers,
                archived_only,
            )
            .await?;
        if page.items.is_empty() {
            break;
        }

        for item in page.items {
            if paths_match(item.cwd.as_path(), cwd_filter) {
                ids.push(item.id);
                if ids.len() == page_size {
                    break;
                }
            }
        }

        let Some(anchor) = page.next_anchor else {
            break;
        };
        next_anchor = Some(anchor);
    }

    Ok(ids)
}

fn paths_match(a: &Path, b: &Path) -> bool {
    if let (Ok(canonical_a), Ok(canonical_b)) = (
        path_utils::normalize_for_path_comparison(a),
        path_utils::normalize_for_path_comparison(b),
    ) {
        return canonical_a == canonical_b;
    }
    a == b
}

/// Look up the rollout path for a thread id using SQLite.
pub async fn find_rollout_path_by_id(
    context: Option<&crate::product::state::StateRuntime>,
    thread_id: ThreadId,
    archived_only: Option<bool>,
    stage: &str,
) -> Option<PathBuf> {
    let ctx = context?;
    ctx.find_rollout_path_by_id(thread_id, archived_only)
        .await
        .unwrap_or_else(|err| {
            warn!("state db find_rollout_path_by_id failed during {stage}: {err}");
            None
        })
}

/// Reconcile rollout items into SQLite, falling back to scanning the rollout file.
pub async fn reconcile_rollout(
    context: Option<&crate::product::state::StateRuntime>,
    rollout_path: &Path,
    default_provider: &str,
    builder: Option<&ThreadMetadataBuilder>,
    items: &[RolloutItem],
) {
    let Some(ctx) = context else {
        return;
    };
    if builder.is_some() || !items.is_empty() {
        apply_rollout_items(
            Some(ctx),
            rollout_path,
            default_provider,
            builder,
            items,
            "reconcile_rollout",
        )
        .await;
        return;
    }
    let outcome =
        match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await {
            Ok(outcome) => outcome,
            Err(err) => {
                if is_unsupported_rollout_schema_anyhow(&err) {
                    warn!(
                        "skipping unsupported legacy rollout {}",
                        rollout_path.display()
                    );
                    return;
                }
                warn!(
                    "state db reconcile_rollout extraction failed {}: {err}",
                    rollout_path.display()
                );
                return;
            }
        };
    if let Err(err) = ctx.upsert_thread(&outcome.metadata).await {
        warn!(
            "state db reconcile_rollout upsert failed {}: {err}",
            rollout_path.display()
        );
    }
}

/// Apply rollout items incrementally to SQLite.
pub async fn apply_rollout_items(
    context: Option<&crate::product::state::StateRuntime>,
    rollout_path: &Path,
    _default_provider: &str,
    builder: Option<&ThreadMetadataBuilder>,
    items: &[RolloutItem],
    stage: &str,
) {
    let Some(ctx) = context else {
        return;
    };
    let mut builder = match builder {
        Some(builder) => builder.clone(),
        None => match metadata::builder_from_items(items, rollout_path) {
            Some(builder) => builder,
            None => {
                warn!(
                    "state db apply_rollout_items missing builder during {stage}: {}",
                    rollout_path.display()
                );
                record_discrepancy(stage, "missing_builder");
                return;
            }
        },
    };
    builder.rollout_path = rollout_path.to_path_buf();
    if let Err(err) = ctx.apply_rollout_items(&builder, items, None).await {
        warn!(
            "state db apply_rollout_items failed during {stage} for {}: {err}",
            rollout_path.display()
        );
    }
}

/// Record a state discrepancy metric with a stage and reason tag.
pub fn record_discrepancy(stage: &str, reason: &str) {
    // We access the global metric because the call sites might not have access to the broader
    // OtelManager.
    tracing::warn!("state db record_discrepancy: {stage}{reason}");
    if let Some(metric) = crate::product::otel::metrics::global() {
        let _ = metric.counter(
            DB_METRIC_COMPARE_ERROR,
            1,
            &[("stage", stage), ("reason", reason)],
        );
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::product::agent::rollout::list::parse_cursor;
    use pretty_assertions::assert_eq;

    #[tokio::test]
    async fn init_if_enabled_without_memory_feature_ignores_corrupt_memory_db() {
        let home = tempfile::tempdir().expect("tempdir");
        tokio::fs::write(
            home.path()
                .join(crate::product::state::MEMORIES_DB_FILENAME),
            "not sqlite",
        )
        .await
        .expect("write corrupt memories db");
        let mut config = crate::product::agent::config::test_config();
        config.lha_home = home.path().to_path_buf();
        config.features.enable(Feature::Goals);
        config.features.disable(Feature::MemoryTool);

        let runtime = init_if_enabled(&config, None)
            .await
            .expect("state runtime should initialize");

        assert!(runtime.memories().is_none());
    }

    #[tokio::test]
    async fn init_if_enabled_with_memory_feature_rejects_corrupt_memory_db() {
        let home = tempfile::tempdir().expect("tempdir");
        tokio::fs::write(
            home.path()
                .join(crate::product::state::MEMORIES_DB_FILENAME),
            "not sqlite",
        )
        .await
        .expect("write corrupt memories db");
        let mut config = crate::product::agent::config::test_config();
        config.lha_home = home.path().to_path_buf();
        config.features.enable(Feature::MemoryTool);

        assert!(init_if_enabled(&config, None).await.is_none());
    }

    #[test]
    fn cursor_to_anchor_normalizes_timestamp_format() {
        let uuid = Uuid::new_v4();
        let ts_str = "2026-01-27T12-34-56";
        let token = format!("{ts_str}|{uuid}");
        let cursor = parse_cursor(token.as_str()).expect("cursor should parse");
        let anchor = cursor_to_anchor(Some(&cursor)).expect("anchor should parse");

        let naive =
            NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H-%M-%S").expect("ts should parse");
        let expected_ts = DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc)
            .with_nanosecond(0)
            .expect("nanosecond");

        assert_eq!(anchor.id, uuid);
        assert_eq!(anchor.ts, expected_ts);
    }
}