pub mod api;
mod assets;
pub mod state;
mod watcher;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use axum::Router;
use tower_http::cors::CorsLayer;
use tower_http::limit::RequestBodyLimitLayer;
pub use state::{AppState, ServerConfig};
pub async fn run_server(config: ServerConfig) -> Result<()> {
let state = Arc::new(AppState::new(config.clone()).await?);
let watcher_state = Arc::clone(&state);
let watcher_handle = tokio::spawn(async move {
if let Err(e) = watcher::start_watcher(watcher_state).await {
eprintln!("flowscope: watcher error: {e}");
}
});
let app = build_router(state, config.port);
let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
let listener = tokio::net::TcpListener::bind(addr)
.await
.context("Failed to bind to address")?;
println!("flowscope: server listening on http://{addr}");
if config.open_browser {
let url = format!("http://localhost:{}", config.port);
if let Err(e) = open::that(&url) {
eprintln!("flowscope: warning: failed to open browser: {e}");
}
}
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.context("Server error")?;
watcher_handle.abort();
println!("\nflowscope: server stopped");
Ok(())
}
const MAX_REQUEST_BODY_SIZE: usize = 100 * 1024 * 1024;
pub fn build_router(state: Arc<AppState>, port: u16) -> Router {
let allowed_origins = [
format!("http://localhost:{port}").parse().unwrap(),
format!("http://127.0.0.1:{port}").parse().unwrap(),
];
let cors = CorsLayer::new()
.allow_origin(allowed_origins)
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::OPTIONS,
])
.allow_headers([axum::http::header::CONTENT_TYPE]);
Router::new()
.nest("/api", api::api_routes())
.fallback(assets::static_handler)
.with_state(state)
.layer(cors)
.layer(RequestBodyLimitLayer::new(MAX_REQUEST_BODY_SIZE))
}
async fn shutdown_signal() {
let ctrl_c = async {
if let Err(e) = tokio::signal::ctrl_c().await {
eprintln!("flowscope: warning: failed to install Ctrl+C handler: {e}");
std::future::pending::<()>().await;
}
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut signal) => {
signal.recv().await;
}
Err(e) => {
eprintln!("flowscope: warning: failed to install SIGTERM handler: {e}");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}
const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
const MAX_TOTAL_FILES: usize = 10_000;
pub fn scan_sql_files(
dirs: &[PathBuf],
) -> Result<(
Vec<flowscope_core::FileSource>,
std::collections::HashMap<PathBuf, std::time::SystemTime>,
)> {
use std::fs;
let mut sources = Vec::new();
let mut mtimes = std::collections::HashMap::new();
let mut base_labels = Vec::with_capacity(dirs.len());
for dir in dirs {
let base = dir
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_else(|| dir.display().to_string());
base_labels.push(base);
}
let mut label_counts = std::collections::HashMap::new();
for base in &base_labels {
*label_counts.entry(base.clone()).or_insert(0) += 1;
}
let multi_root = dirs.len() > 1;
let mut seen_counts = std::collections::HashMap::new();
let dir_prefixes: Vec<Option<String>> = base_labels
.iter()
.map(|base| {
if !multi_root {
return None;
}
let total = label_counts.get(base).copied().unwrap_or(1);
if total == 1 {
return Some(base.clone());
}
let entry = seen_counts.entry(base.clone()).or_insert(0);
*entry += 1;
if *entry == 1 {
Some(base.clone())
} else {
Some(format!("{base}#{}", *entry))
}
})
.collect();
for (dir, prefix) in dirs.iter().zip(dir_prefixes.iter()) {
if !dir.exists() {
eprintln!(
"flowscope: warning: watch directory does not exist: {}",
dir.display()
);
continue;
}
for entry in walkdir::WalkDir::new(dir)
.follow_links(false)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
if path.is_file() && path.extension().is_some_and(|ext| ext == "sql") {
let metadata = fs::metadata(path)
.with_context(|| format!("Failed to read metadata for {}", path.display()))?;
if metadata.len() > MAX_FILE_SIZE {
eprintln!(
"flowscope: warning: skipping large file (>10MB): {}",
path.display()
);
continue;
}
let content = fs::read_to_string(path)
.with_context(|| format!("Failed to read {}", path.display()))?;
let relative_path = path
.strip_prefix(dir)
.with_context(|| format!("File outside watch directory: {}", path.display()))?;
let relative_str = relative_path.to_string_lossy();
let name = if let Some(prefix) = prefix {
format!("{prefix}/{}", relative_str)
} else {
relative_str.to_string()
};
sources.push(flowscope_core::FileSource { name, content });
if let Ok(mtime) = metadata.modified() {
mtimes.insert(path.to_path_buf(), mtime);
}
if sources.len() >= MAX_TOTAL_FILES {
eprintln!(
"flowscope: warning: reached file limit ({}), skipping remaining files",
MAX_TOTAL_FILES
);
return Ok((sources, mtimes));
}
}
}
}
Ok((sources, mtimes))
}