flowscope_cli/server/
mod.rs1pub mod api;
7mod assets;
8pub mod state;
9mod watcher;
10
11use std::net::SocketAddr;
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use anyhow::{Context, Result};
16use axum::Router;
17use tower_http::cors::CorsLayer;
18use tower_http::limit::RequestBodyLimitLayer;
19
20pub use state::{AppState, ServerConfig};
21
22pub async fn run_server(config: ServerConfig) -> Result<()> {
26 let state = Arc::new(AppState::new(config.clone()).await?);
27
28 let watcher_state = Arc::clone(&state);
30 let watcher_handle = tokio::spawn(async move {
31 if let Err(e) = watcher::start_watcher(watcher_state).await {
32 eprintln!("flowscope: watcher error: {e}");
33 }
34 });
35
36 let app = build_router(state, config.port);
37
38 let addr = SocketAddr::from(([127, 0, 0, 1], config.port));
39
40 let listener = tokio::net::TcpListener::bind(addr)
42 .await
43 .context("Failed to bind to address")?;
44
45 println!("flowscope: server listening on http://{addr}");
46
47 if config.open_browser {
49 let url = format!("http://localhost:{}", config.port);
50 if let Err(e) = open::that(&url) {
51 eprintln!("flowscope: warning: failed to open browser: {e}");
52 }
53 }
54
55 axum::serve(listener, app)
56 .with_graceful_shutdown(shutdown_signal())
57 .await
58 .context("Server error")?;
59
60 watcher_handle.abort();
61 println!("\nflowscope: server stopped");
62
63 Ok(())
64}
65
66const MAX_REQUEST_BODY_SIZE: usize = 100 * 1024 * 1024;
78
79pub fn build_router(state: Arc<AppState>, port: u16) -> Router {
81 let allowed_origins = [
85 format!("http://localhost:{port}").parse().unwrap(),
86 format!("http://127.0.0.1:{port}").parse().unwrap(),
87 ];
88
89 let cors = CorsLayer::new()
90 .allow_origin(allowed_origins)
91 .allow_methods([
92 axum::http::Method::GET,
93 axum::http::Method::POST,
94 axum::http::Method::OPTIONS,
95 ])
96 .allow_headers([axum::http::header::CONTENT_TYPE]);
97
98 Router::new()
99 .nest("/api", api::api_routes())
100 .fallback(assets::static_handler)
101 .with_state(state)
102 .layer(cors)
103 .layer(RequestBodyLimitLayer::new(MAX_REQUEST_BODY_SIZE))
104}
105
106async fn shutdown_signal() {
114 let ctrl_c = async {
115 if let Err(e) = tokio::signal::ctrl_c().await {
116 eprintln!("flowscope: warning: failed to install Ctrl+C handler: {e}");
117 std::future::pending::<()>().await;
119 }
120 };
121
122 #[cfg(unix)]
123 let terminate = async {
124 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
125 Ok(mut signal) => {
126 signal.recv().await;
127 }
128 Err(e) => {
129 eprintln!("flowscope: warning: failed to install SIGTERM handler: {e}");
130 std::future::pending::<()>().await;
132 }
133 }
134 };
135
136 #[cfg(not(unix))]
137 let terminate = std::future::pending::<()>();
138
139 tokio::select! {
140 _ = ctrl_c => {},
141 _ = terminate => {},
142 }
143}
144
145const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
151
152const MAX_TOTAL_FILES: usize = 10_000;
158
159pub fn scan_sql_files(
161 dirs: &[PathBuf],
162) -> Result<(
163 Vec<flowscope_core::FileSource>,
164 std::collections::HashMap<PathBuf, std::time::SystemTime>,
165)> {
166 use std::fs;
167
168 let mut sources = Vec::new();
169 let mut mtimes = std::collections::HashMap::new();
170
171 let mut base_labels = Vec::with_capacity(dirs.len());
174 for dir in dirs {
175 let base = dir
176 .file_name()
177 .map(|name| name.to_string_lossy().to_string())
178 .unwrap_or_else(|| dir.display().to_string());
179 base_labels.push(base);
180 }
181
182 let mut label_counts = std::collections::HashMap::new();
183 for base in &base_labels {
184 *label_counts.entry(base.clone()).or_insert(0) += 1;
185 }
186
187 let multi_root = dirs.len() > 1;
188 let mut seen_counts = std::collections::HashMap::new();
189 let dir_prefixes: Vec<Option<String>> = base_labels
190 .iter()
191 .map(|base| {
192 if !multi_root {
193 return None;
194 }
195
196 let total = label_counts.get(base).copied().unwrap_or(1);
197 if total == 1 {
198 return Some(base.clone());
199 }
200
201 let entry = seen_counts.entry(base.clone()).or_insert(0);
202 *entry += 1;
203 if *entry == 1 {
204 Some(base.clone())
205 } else {
206 Some(format!("{base}#{}", *entry))
207 }
208 })
209 .collect();
210
211 for (dir, prefix) in dirs.iter().zip(dir_prefixes.iter()) {
212 if !dir.exists() {
213 eprintln!(
214 "flowscope: warning: watch directory does not exist: {}",
215 dir.display()
216 );
217 continue;
218 }
219
220 for entry in walkdir::WalkDir::new(dir)
223 .follow_links(false)
224 .into_iter()
225 .filter_map(|e| e.ok())
226 {
227 let path = entry.path();
228 if path.is_file() && path.extension().is_some_and(|ext| ext == "sql") {
229 let metadata = fs::metadata(path)
231 .with_context(|| format!("Failed to read metadata for {}", path.display()))?;
232
233 if metadata.len() > MAX_FILE_SIZE {
234 eprintln!(
235 "flowscope: warning: skipping large file (>10MB): {}",
236 path.display()
237 );
238 continue;
239 }
240
241 let content = fs::read_to_string(path)
242 .with_context(|| format!("Failed to read {}", path.display()))?;
243
244 let relative_path = path
247 .strip_prefix(dir)
248 .with_context(|| format!("File outside watch directory: {}", path.display()))?;
249
250 let relative_str = relative_path.to_string_lossy();
251 let name = if let Some(prefix) = prefix {
252 format!("{prefix}/{}", relative_str)
253 } else {
254 relative_str.to_string()
255 };
256 sources.push(flowscope_core::FileSource { name, content });
257
258 if let Ok(mtime) = metadata.modified() {
260 mtimes.insert(path.to_path_buf(), mtime);
261 }
262
263 if sources.len() >= MAX_TOTAL_FILES {
265 eprintln!(
266 "flowscope: warning: reached file limit ({}), skipping remaining files",
267 MAX_TOTAL_FILES
268 );
269 return Ok((sources, mtimes));
270 }
271 }
272 }
273 }
274
275 Ok((sources, mtimes))
276}