1use std::collections::HashMap;
4use std::convert::Infallible;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use axum::extract::{MatchedPath, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::sse::{Event, Sse};
11use axum::response::{Html, IntoResponse, Response};
12use axum::routing::{get, post};
13use axum::Router;
14use futures_core::Stream;
15use tokio::net::TcpListener;
16use tokio::task::JoinSet;
17use tokio_stream::StreamExt;
18
19use crate::errors::SeamError;
20use crate::manifest::build_manifest;
21use crate::page::PageDef;
22use crate::procedure::{ProcedureDef, SubscriptionDef};
23
24struct AppState {
25 manifest_json: serde_json::Value,
26 handlers: HashMap<String, Arc<ProcedureDef>>,
27 subscriptions: HashMap<String, Arc<SubscriptionDef>>,
28 pages: HashMap<String, Arc<PageDef>>,
29}
30
31pub struct SeamServer {
32 procedures: Vec<ProcedureDef>,
33 subscriptions: Vec<SubscriptionDef>,
34 pages: Vec<PageDef>,
35}
36
37impl SeamServer {
38 pub fn new() -> Self {
39 Self { procedures: Vec::new(), subscriptions: Vec::new(), pages: Vec::new() }
40 }
41
42 pub fn procedure(mut self, proc: ProcedureDef) -> Self {
43 self.procedures.push(proc);
44 self
45 }
46
47 pub fn subscription(mut self, sub: SubscriptionDef) -> Self {
48 self.subscriptions.push(sub);
49 self
50 }
51
52 pub fn page(mut self, page: PageDef) -> Self {
53 self.pages.push(page);
54 self
55 }
56
57 pub fn into_router(self) -> Router {
58 let manifest = build_manifest(&self.procedures, &self.subscriptions);
59 let manifest_json = serde_json::to_value(&manifest).expect("manifest serialization");
60
61 let handlers: HashMap<String, Arc<ProcedureDef>> =
62 self.procedures.into_iter().map(|p| (p.name.clone(), Arc::new(p))).collect();
63
64 let subscriptions: HashMap<String, Arc<SubscriptionDef>> =
65 self.subscriptions.into_iter().map(|s| (s.name.clone(), Arc::new(s))).collect();
66
67 let mut pages = HashMap::new();
68 let mut router = Router::new()
69 .route("/_seam/manifest.json", get(handle_manifest))
70 .route("/_seam/rpc/{name}", post(handle_rpc))
71 .route("/_seam/subscribe/{name}", get(handle_subscribe));
72
73 for page in self.pages {
74 let full_route = format!("/_seam/page{}", page.route);
75 pages.insert(full_route.clone(), Arc::new(page));
76 router = router.route(&full_route, get(handle_page));
77 }
78
79 let state = Arc::new(AppState { manifest_json, handlers, subscriptions, pages });
80
81 router.with_state(state)
82 }
83
84 pub async fn serve(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
85 let router = self.into_router();
86 let listener = TcpListener::bind(addr).await?;
87 let local_addr = listener.local_addr()?;
88 println!("Seam Rust backend running on http://localhost:{}", local_addr.port());
89 axum::serve(listener, router).await?;
90 Ok(())
91 }
92}
93
94impl Default for SeamServer {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl IntoResponse for SeamError {
101 fn into_response(self) -> Response {
102 let status = StatusCode::from_u16(self.status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
103 let body = serde_json::json!({
104 "error": {
105 "code": self.code(),
106 "message": self.message(),
107 }
108 });
109 (status, axum::Json(body)).into_response()
110 }
111}
112
113async fn handle_manifest(State(state): State<Arc<AppState>>) -> impl IntoResponse {
114 axum::Json(state.manifest_json.clone())
115}
116
117async fn handle_rpc(
118 State(state): State<Arc<AppState>>,
119 Path(name): Path<String>,
120 body: axum::body::Bytes,
121) -> Result<impl IntoResponse, SeamError> {
122 let proc = state
123 .handlers
124 .get(&name)
125 .ok_or_else(|| SeamError::not_found(format!("Procedure '{name}' not found")))?;
126
127 let input: serde_json::Value =
128 serde_json::from_slice(&body).map_err(|e| SeamError::validation(e.to_string()))?;
129
130 let result = (proc.handler)(input).await?;
131 Ok(axum::Json(result))
132}
133
134#[derive(serde::Deserialize)]
135struct SubscribeQuery {
136 input: Option<String>,
137}
138
139async fn handle_subscribe(
140 State(state): State<Arc<AppState>>,
141 Path(name): Path<String>,
142 Query(query): Query<SubscribeQuery>,
143) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>> {
144 let setup = async {
145 let sub = state
146 .subscriptions
147 .get(&name)
148 .ok_or_else(|| SeamError::not_found(format!("Subscription '{name}' not found")))?;
149
150 let raw_input = match &query.input {
151 Some(s) => serde_json::from_str(s).map_err(|e| SeamError::validation(e.to_string()))?,
152 None => serde_json::Value::Object(serde_json::Map::new()),
153 };
154
155 let data_stream = (sub.handler)(raw_input).await?;
156 Ok::<_, SeamError>(data_stream)
157 };
158
159 match setup.await {
160 Ok(data_stream) => {
161 let event_stream = data_stream
162 .map(|item| match item {
163 Ok(value) => {
164 let data = serde_json::to_string(&value).unwrap_or_default();
165 Ok(Event::default().event("data").data(data))
166 }
167 Err(e) => {
168 let payload = serde_json::json!({ "code": e.code(), "message": e.message() });
169 Ok(Event::default().event("error").data(payload.to_string()))
170 }
171 })
172 .chain(tokio_stream::once(Ok(Event::default().event("complete").data("{}"))));
173 Sse::new(Box::pin(event_stream))
174 }
175 Err(err) => {
176 let payload = serde_json::json!({ "code": err.code(), "message": err.message() });
177 let error_event = Event::default().event("error").data(payload.to_string());
178 Sse::new(Box::pin(tokio_stream::once(Ok(error_event))))
179 }
180 }
181}
182
183async fn handle_page(
184 State(state): State<Arc<AppState>>,
185 matched: MatchedPath,
186 Path(params): Path<HashMap<String, String>>,
187) -> Result<Html<String>, SeamError> {
188 let route_pattern = matched.as_str().to_string();
189 let page =
190 state.pages.get(&route_pattern).ok_or_else(|| SeamError::not_found("Page not found"))?;
191
192 let mut join_set = JoinSet::new();
193
194 let handlers = state.handlers.clone();
195 for loader in &page.loaders {
196 let input = (loader.input_fn)(¶ms);
197 let proc_name = loader.procedure.clone();
198 let data_key = loader.data_key.clone();
199 let handlers = handlers.clone();
200
201 join_set.spawn(async move {
202 let proc = handlers
203 .get(&proc_name)
204 .ok_or_else(|| SeamError::internal(format!("Procedure '{proc_name}' not found")))?;
205 let result = (proc.handler)(input).await?;
206 Ok::<(String, serde_json::Value), SeamError>((data_key, result))
207 });
208 }
209
210 let mut data = serde_json::Map::new();
211 while let Some(result) = join_set.join_next().await {
212 let (key, value) = result
213 .map_err(|e| SeamError::internal(e.to_string()))? ?; data.insert(key, value);
216 }
217
218 let html = seam_injector::inject(&page.template, &serde_json::Value::Object(data));
219 Ok(Html(html))
220}