1pub mod error;
16pub mod helpers;
17pub mod routes;
18pub mod sse;
19pub mod state;
20pub mod watcher;
21pub mod ws;
22
23pub use error::{AppError, AppResult};
24pub use state::{AppState, MetadataCache};
25
26use axum::{
27 routing::{delete, get, post},
28 Router,
29};
30use flow_core::AgentConfig;
31use std::sync::Arc;
32use tokio::sync::{broadcast, RwLock};
33use tower_http::services::ServeDir;
34use tracing::info;
35
36pub fn build_router(state: Arc<AppState>) -> Router {
38 let mut app = Router::new()
39 .route("/api/sessions", get(routes::sessions::list_sessions))
40 .route(
41 "/api/sessions/:session_id",
42 get(routes::sessions::get_session),
43 )
44 .route("/api/tasks/all", get(routes::tasks::get_all_tasks))
45 .route(
46 "/api/tasks/:session_id/:task_id/note",
47 post(routes::tasks::add_note),
48 )
49 .route(
50 "/api/tasks/:session_id/:task_id",
51 delete(routes::tasks::delete_task),
52 )
53 .route("/api/events", get(sse::sse_handler))
54 .route("/api/ws", get(ws::ws_handler))
55 .route("/api/theme", get(routes::theme::get_theme))
56 .route("/api/theme", post(routes::theme::set_theme));
57
58 if state.db.is_some() {
60 app = app.nest("/api/features", routes::features::feature_routes());
61 }
62
63 app.with_state(state)
64}
65
66#[allow(clippy::cognitive_complexity)]
68pub async fn run_server(config: AgentConfig) -> flow_core::Result<()> {
69 let tasks_dir = config.tasks_dir();
70 let projects_dir = config.projects_dir();
71
72 info!("Tasks directory: {}", tasks_dir.display());
73 info!("Projects directory: {}", projects_dir.display());
74
75 let public_dir = config.public_dir.as_ref().map_or_else(
77 || {
78 std::env::current_exe()
80 .ok()
81 .and_then(|p| p.parent().map(std::path::Path::to_path_buf))
82 .map_or_else(
83 || std::path::PathBuf::from("public"),
84 |exe| {
85 let candidate = exe.join("..").join("public");
86 if candidate.exists() {
87 candidate
88 } else {
89 std::path::PathBuf::from("public")
90 }
91 },
92 )
93 },
94 std::clone::Clone::clone,
95 );
96
97 info!("Public directory: {}", public_dir.display());
98
99 let (tx, _) = broadcast::channel::<String>(256);
101
102 let db = None;
104
105 let state = Arc::new(AppState {
106 tasks_dir: tasks_dir.clone(),
107 projects_dir: projects_dir.clone(),
108 tx: tx.clone(),
109 metadata_cache: RwLock::new(MetadataCache::new()),
110 db,
111 });
112
113 let _watchers = watcher::setup_file_watcher(&tasks_dir, &projects_dir, tx);
115
116 let app = build_router(state.clone()).fallback_service(ServeDir::new(&public_dir));
118
119 let addr = std::net::SocketAddr::from(([0, 0, 0, 0], config.port));
120 info!("Server running at http://localhost:{}", config.port);
121
122 if config.open_browser {
124 let url = format!("http://localhost:{}", config.port);
125 tokio::spawn(async move {
126 let _ = open_browser(&url).await;
127 });
128 }
129
130 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
131 if e.kind() == std::io::ErrorKind::AddrInUse {
132 flow_core::FlowError::Io(std::io::Error::new(
133 std::io::ErrorKind::AddrInUse,
134 format!(
135 "Port {} already in use. Try: flow serve --port {}",
136 config.port,
137 config.port + 1
138 ),
139 ))
140 } else {
141 flow_core::FlowError::Io(e)
142 }
143 })?;
144
145 axum::serve(listener, app)
146 .await
147 .map_err(|e| flow_core::FlowError::Io(std::io::Error::other(e)))?;
148
149 Ok(())
150}
151
152async fn open_browser(url: &str) -> Result<(), std::io::Error> {
154 #[cfg(target_os = "macos")]
155 {
156 tokio::process::Command::new("open")
157 .arg(url)
158 .status()
159 .await?;
160 }
161 #[cfg(target_os = "linux")]
162 {
163 tokio::process::Command::new("xdg-open")
164 .arg(url)
165 .status()
166 .await?;
167 }
168 #[cfg(target_os = "windows")]
169 {
170 tokio::process::Command::new("cmd")
171 .args(["/C", "start", "", url])
172 .status()
173 .await?;
174 }
175 Ok(())
176}