pub mod azure_faults;
pub mod confluence_gen;
pub mod embedder;
pub mod jira_gen;
pub mod opts;
pub mod scheduler;
pub mod world;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub use opts::SimOpts;
use crate::azure::schema::EmbeddingConfig;
use crate::config::{
AuthConfig, AzureConfig, Config, ConfluenceSourceConfig, JiraSourceConfig, SourceConfig,
SyncConfig,
};
use crate::sim::embedder::SimEmbedder;
use crate::sync::{IndexMode, UiCommand};
use crate::tui::events::QuelchEvent;
pub const MOCK_PAT: &str = "mock-pat-token";
pub type TuiInputs = (mpsc::Receiver<QuelchEvent>, Arc<AtomicU64>);
pub async fn run(opts: SimOpts, tui_inputs: Option<TuiInputs>) -> Result<()> {
let cancel = CancellationToken::new();
let listener = tokio::net::TcpListener::bind(SocketAddr::from((
[127, 0, 0, 1],
opts.mock_port.unwrap_or(0),
)))
.await
.context("bind mock port")?;
let mock_addr = listener.local_addr()?;
let mock_cancel = cancel.clone();
let mock_handle = tokio::spawn(async move {
let router = crate::mock::build_router();
let _ = axum::serve(listener, router)
.with_graceful_shutdown(async move { mock_cancel.cancelled().await })
.await;
});
let base = format!("http://{mock_addr}");
tracing::info!(mock = %base, "sim: mock server up");
world::seed(&base, opts.seed).await.context("seed corpus")?;
tracing::info!("sim: starter corpus seeded");
let scheduler_cancel = cancel.clone();
let scheduler_base = base.clone();
let scheduler_rate = opts.rate_multiplier;
let scheduler_seed = opts.seed;
let scheduler_handle = tokio::spawn(async move {
let _ = scheduler::run(
scheduler_base,
scheduler_seed,
scheduler_rate,
scheduler_cancel,
)
.await;
});
let fault_cancel = cancel.clone();
let fault_base = base.clone();
let fault_seed = opts.seed;
let fault_rate = opts.fault_rate;
let fault_handle = tokio::spawn(async move {
let _ = azure_faults::run(fault_base, fault_rate, fault_seed, fault_cancel).await;
});
let config = sim_config(&base);
let embedding = EmbeddingConfig {
dimensions: 8,
vectorizer_json: serde_json::json!({}),
};
let embedder = SimEmbedder::new(8, opts.seed);
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel::<UiCommand>(16);
let state_path = std::env::temp_dir().join(format!("quelch-sim-{}.json", std::process::id()));
let engine_cancel = cancel.clone();
let engine_config = config.clone();
let engine_state_path = state_path.clone();
let engine_handle = tokio::spawn(async move {
let _ = run_engine_loop(
engine_config,
engine_state_path,
embedding,
embedder,
cmd_rx,
engine_cancel,
)
.await;
});
let started = Instant::now();
if let Some(snapshot_path) = opts.snapshot_to.clone() {
let (events_rx, drops) = tui_inputs.ok_or_else(|| {
anyhow::anyhow!(
"snapshot mode requires TUI wiring; run without --no-tui or pass tui_inputs"
)
})?;
let res = run_tui_snapshot(&opts, &base, events_rx, drops).await;
shutdown_all(
&cmd_tx,
&cancel,
engine_handle,
scheduler_handle,
fault_handle,
mock_handle,
)
.await;
let docs = synced_doc_count(&state_path).unwrap_or(0);
println!(
"sim-snapshot: {} frames written to {}, {} docs synced",
opts.snapshot_frames,
snapshot_path.display(),
docs
);
if let Some(threshold) = opts.assert_docs
&& docs < threshold
{
anyhow::bail!("assert_docs failed: only {docs} < {threshold}");
}
return res;
}
if let Some((events_rx, drops)) = tui_inputs {
let prefs_path =
std::env::temp_dir().join(format!("quelch-sim-tui-{}.json", std::process::id()));
let tui_config = config.clone();
let tui_cmd_tx = cmd_tx.clone();
let tui_cancel = cancel.clone();
let tui_handle = tokio::spawn(async move {
let result =
crate::tui::run(tui_config, prefs_path, events_rx, tui_cmd_tx, drops).await;
tui_cancel.cancel();
result
});
tokio::select! {
_ = async {
if let Some(duration) = opts.duration {
tokio::time::sleep(duration).await;
} else {
std::future::pending::<()>().await;
}
} => {
cancel.cancel();
}
_ = cancel.cancelled() => {}
}
shutdown_all(
&cmd_tx,
&cancel,
engine_handle,
scheduler_handle,
fault_handle,
mock_handle,
)
.await;
let _ = tui_handle.await;
let docs = synced_doc_count(&state_path).unwrap_or(0);
println!(
"sim: {:.1}s, {} docs synced",
started.elapsed().as_secs_f32(),
docs
);
if let Some(threshold) = opts.assert_docs
&& docs < threshold
{
anyhow::bail!("assert_docs failed: only {docs} < {threshold}");
}
return Ok(());
}
let ctrl_c_cancel = cancel.clone();
let ctrl_c_cmd_tx = cmd_tx.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
let _ = ctrl_c_cmd_tx.send(UiCommand::Shutdown).await;
ctrl_c_cancel.cancel();
}
});
if let Some(duration) = opts.duration {
tokio::select! {
_ = tokio::time::sleep(duration) => cancel.cancel(),
_ = cancel.cancelled() => {}
}
} else {
cancel.cancelled().await;
}
shutdown_all(
&cmd_tx,
&cancel,
engine_handle,
scheduler_handle,
fault_handle,
mock_handle,
)
.await;
let docs = synced_doc_count(&state_path).unwrap_or(0);
println!(
"sim: {:.1}s, {} docs synced",
started.elapsed().as_secs_f32(),
docs
);
if let Some(threshold) = opts.assert_docs
&& docs < threshold
{
anyhow::bail!("assert_docs failed: only {docs} < {threshold}");
}
Ok(())
}
async fn shutdown_all(
cmd_tx: &mpsc::Sender<UiCommand>,
cancel: &CancellationToken,
engine_handle: tokio::task::JoinHandle<()>,
scheduler_handle: tokio::task::JoinHandle<()>,
fault_handle: tokio::task::JoinHandle<()>,
mock_handle: tokio::task::JoinHandle<()>,
) {
let _ = cmd_tx.send(UiCommand::Shutdown).await;
cancel.cancel();
let _ = engine_handle.await;
scheduler_handle.abort();
fault_handle.abort();
mock_handle.abort();
}
async fn run_engine_loop(
config: Config,
state_path: PathBuf,
embedding: EmbeddingConfig,
embedder: SimEmbedder,
mut cmd_rx: tokio::sync::mpsc::Receiver<UiCommand>,
cancel: CancellationToken,
) -> Result<()> {
let mut cycle: u64 = 0;
while !cancel.is_cancelled() {
cycle += 1;
tokio::select! {
_ = crate::sync::run_sync_with(
&config,
&state_path,
&embedding,
IndexMode::AutoCreate,
Some(&embedder as &dyn crate::sync::embedder::Embedder),
None,
&mut cmd_rx,
cycle,
) => {}
_ = cancel.cancelled() => break,
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {}
_ = cancel.cancelled() => break,
}
}
Ok(())
}
fn sim_config(base: &str) -> Config {
Config {
azure: AzureConfig {
endpoint: format!("{base}/azure"),
api_key: "ignored".into(),
},
sources: vec![
SourceConfig::Jira(JiraSourceConfig {
name: "sim-jira".into(),
url: format!("{base}/jira"),
auth: AuthConfig::DataCenter {
pat: MOCK_PAT.into(),
},
projects: vec!["QUELCH".into(), "DEMO".into()],
index: "sim-jira-issues".into(),
}),
SourceConfig::Confluence(ConfluenceSourceConfig {
name: "sim-confluence".into(),
url: format!("{base}/confluence"),
auth: AuthConfig::DataCenter {
pat: MOCK_PAT.into(),
},
spaces: vec!["QUELCH".into(), "INFRA".into()],
index: "sim-confluence-pages".into(),
}),
],
sync: SyncConfig::default(),
}
}
async fn run_tui_snapshot(
opts: &SimOpts,
base: &str,
events_rx: tokio::sync::mpsc::Receiver<crate::tui::events::QuelchEvent>,
drops: std::sync::Arc<std::sync::atomic::AtomicU64>,
) -> Result<()> {
use ratatui::Terminal;
use ratatui::backend::TestBackend;
use std::io::Write;
let path = opts
.snapshot_to
.as_ref()
.ok_or_else(|| anyhow::anyhow!("snapshot_to not set"))?
.clone();
let frames = opts.snapshot_frames.max(1);
let mut events_rx = events_rx;
let prefs = crate::tui::prefs::Prefs::default();
let config = sim_config(base);
let mut app = crate::tui::app::App::new(&config, prefs);
let backend = TestBackend::new(opts.snapshot_width, opts.snapshot_height);
let mut terminal = Terminal::new(backend)?;
let start = std::time::Instant::now();
let mut file = std::fs::File::create(&path)?;
for frame_idx in 0..frames {
while let Ok(ev) = events_rx.try_recv() {
app.apply(ev);
}
app.tick_spinner();
app.drops = drops.load(std::sync::atomic::Ordering::Relaxed);
terminal.draw(|f| {
crate::tui::layout::draw(f, &app, start.elapsed(), false);
})?;
let buf = terminal.backend().buffer();
writeln!(
file,
"===== FRAME {frame_idx} (uptime {:.2}s) =====",
start.elapsed().as_secs_f32()
)?;
for y in 0..buf.area.height {
let line: String = (0..buf.area.width)
.map(|x| buf[(x, y)].symbol())
.collect::<String>();
writeln!(file, "{line}")?;
}
writeln!(file)?;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Ok(())
}
fn synced_doc_count(state_path: &std::path::Path) -> Result<u64> {
let raw = std::fs::read_to_string(state_path)?;
let v: serde_json::Value = serde_json::from_str(&raw)?;
let mut total = 0u64;
if let Some(sources) = v.get("sources").and_then(|s| s.as_object()) {
for (_, src) in sources {
if let Some(subs) = src.get("subsources").and_then(|s| s.as_object()) {
for (_, sub) in subs {
if let Some(n) = sub.get("documents_synced").and_then(|n| n.as_u64()) {
total += n;
}
}
}
}
}
Ok(total)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn short_run_succeeds() {
let opts = SimOpts {
duration: Some(Duration::from_millis(500)),
seed: Some(42),
rate_multiplier: 5.0,
fault_rate: 0.0,
assert_docs: None,
mock_port: None,
snapshot_to: None,
snapshot_frames: 10,
snapshot_width: 120,
snapshot_height: 40,
};
run(opts, None).await.unwrap();
}
}