darq 0.1.0

darq CLI + TUI — autonomous issue → PR pipeline with SAT and a learning loop.
Documentation
//! JSONL replay source for the TUI.
//!
//! Reads a captured `DaemonEvent` stream from a JSONL file and feeds it into the same
//! `TuiEvent::DaemonEvent` channel the live socket subscriber uses. Preserves inter-event
//! wall-clock spacing, scaled by `speed` (e.g. 4.0 = 4× faster than real time).
//!
//! Every line in the file is expected to be a JSON object matching `DaemonEvent`:
//! `{"event":"run_event","data":{...}}`. Lines that fail to parse are skipped with a
//! warn-level trace.

use std::path::PathBuf;
use std::time::Duration;

use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::mpsc;

use crate::daemon::client::DaemonEvent;

/// Spawn a task that reads `path` line-by-line, paces events by their timestamps
/// (scaled by `speed`), and pushes them into `tx`. Exits when the file is exhausted
/// or the receiver is dropped. Sends an ack-style log event at the end so the TUI
/// can surface "replay complete".
pub async fn run(path: PathBuf, speed: f64, tx: mpsc::UnboundedSender<DaemonEvent>) -> Result<()> {
    let file = File::open(&path)
        .await
        .with_context(|| format!("opening replay file {}", path.display()))?;
    let mut lines = BufReader::new(file).lines();

    // We pace events relative to the first one: the moment we see the first event,
    // we peg its timestamp to `now` and every subsequent event waits
    // `(event.ts - first.ts) / speed` from that reference.
    let mut anchor: Option<(DateTime<Utc>, std::time::Instant)> = None;
    let mut count = 0usize;

    while let Some(line) = lines.next_line().await.context("reading replay line")? {
        let trimmed = line.trim();
        if trimmed.is_empty() {
            continue;
        }

        // Parse the full daemon event.
        let event: DaemonEvent = match serde_json::from_str(trimmed) {
            Ok(e) => e,
            Err(e) => {
                tracing::warn!(error = %e, "replay: skipping malformed line");
                continue;
            }
        };

        // Extract timestamp from the inner RunEvent (all variants have `timestamp`).
        let ts = event
            .data
            .get("timestamp")
            .and_then(|v| v.as_str())
            .and_then(|s| s.parse::<DateTime<Utc>>().ok());

        if let Some(ts) = ts {
            match anchor {
                None => {
                    anchor = Some((ts, std::time::Instant::now()));
                }
                Some((first_ts, started)) => {
                    let event_offset = (ts - first_ts).to_std().unwrap_or(Duration::ZERO);
                    // Scale the offset by speed.
                    let scaled = event_offset.div_f64(speed.max(0.01));
                    let target = started + scaled;
                    let now = std::time::Instant::now();
                    if target > now {
                        tokio::time::sleep(target - now).await;
                    }
                }
            }
        }

        if tx.send(event).is_err() {
            tracing::debug!("replay: TUI disconnected, stopping");
            return Ok(());
        }
        count += 1;
    }

    tracing::info!(count, "replay complete");

    // Send a final synthetic "replay_complete" event so the TUI can optionally
    // render a banner. Uses the `log` RunEvent variant (already supported).
    let final_event = DaemonEvent {
        event: "run_event".into(),
        data: serde_json::json!({
            "type": "log",
            "run_id": "replay",
            "level": "info",
            "message": format!("replay complete · {count} events replayed"),
            "timestamp": Utc::now(),
        }),
    };
    let _ = tx.send(final_event);

    Ok(())
}