1use std::collections::HashMap;
4use std::convert::Infallible;
5use std::sync::Arc;
6
7use axum::extract::{MatchedPath, Path, Query, State};
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::Router;
12use futures_core::Stream;
13use tokio::net::TcpListener;
14use tokio::task::JoinSet;
15use tokio_stream::StreamExt;
16
17use crate::errors::SeamError;
18use crate::injector;
19use crate::manifest::build_manifest;
20use crate::page::PageDef;
21use crate::procedure::{ProcedureDef, SubscriptionDef};
22
23struct AppState {
24 manifest_json: serde_json::Value,
25 handlers: HashMap<String, Arc<ProcedureDef>>,
26 subscriptions: HashMap<String, Arc<SubscriptionDef>>,
27 pages: HashMap<String, Arc<PageDef>>,
28}
29
30pub struct SeamServer {
31 procedures: Vec<ProcedureDef>,
32 subscriptions: Vec<SubscriptionDef>,
33 pages: Vec<PageDef>,
34}
35
36impl SeamServer {
37 pub fn new() -> Self {
38 Self { procedures: Vec::new(), subscriptions: Vec::new(), pages: Vec::new() }
39 }
40
41 pub fn procedure(mut self, proc: ProcedureDef) -> Self {
42 self.procedures.push(proc);
43 self
44 }
45
46 pub fn subscription(mut self, sub: SubscriptionDef) -> Self {
47 self.subscriptions.push(sub);
48 self
49 }
50
51 pub fn page(mut self, page: PageDef) -> Self {
52 self.pages.push(page);
53 self
54 }
55
56 pub fn into_router(self) -> Router {
57 let manifest = build_manifest(&self.procedures, &self.subscriptions);
58 let manifest_json = serde_json::to_value(&manifest).expect("manifest serialization");
59
60 let handlers: HashMap<String, Arc<ProcedureDef>> =
61 self.procedures.into_iter().map(|p| (p.name.clone(), Arc::new(p))).collect();
62
63 let subscriptions: HashMap<String, Arc<SubscriptionDef>> =
64 self.subscriptions.into_iter().map(|s| (s.name.clone(), Arc::new(s))).collect();
65
66 let mut pages = HashMap::new();
67 let mut router = Router::new()
68 .route("/_seam/manifest.json", get(handle_manifest))
69 .route("/_seam/rpc/{name}", post(handle_rpc))
70 .route("/_seam/subscribe/{name}", get(handle_subscribe));
71
72 for page in self.pages {
73 let full_route = format!("/_seam/page{}", page.route);
74 pages.insert(full_route.clone(), Arc::new(page));
75 router = router.route(&full_route, get(handle_page));
76 }
77
78 let state = Arc::new(AppState { manifest_json, handlers, subscriptions, pages });
79
80 router.with_state(state)
81 }
82
83 pub async fn serve(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
84 let router = self.into_router();
85 let listener = TcpListener::bind(addr).await?;
86 println!("Seam Rust backend running on http://{addr}");
87 axum::serve(listener, router).await?;
88 Ok(())
89 }
90}
91
92impl Default for SeamServer {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98async fn handle_manifest(State(state): State<Arc<AppState>>) -> impl IntoResponse {
99 axum::Json(state.manifest_json.clone())
100}
101
102async fn handle_rpc(
103 State(state): State<Arc<AppState>>,
104 Path(name): Path<String>,
105 body: axum::body::Bytes,
106) -> Result<impl IntoResponse, SeamError> {
107 let proc = state
108 .handlers
109 .get(&name)
110 .ok_or_else(|| SeamError::not_found(format!("Procedure '{name}' not found")))?;
111
112 let input: serde_json::Value =
113 serde_json::from_slice(&body).map_err(|e| SeamError::validation(e.to_string()))?;
114
115 let result = (proc.handler)(input).await?;
116 Ok(axum::Json(result))
117}
118
119#[derive(serde::Deserialize)]
120struct SubscribeQuery {
121 input: Option<String>,
122}
123
124async fn handle_subscribe(
125 State(state): State<Arc<AppState>>,
126 Path(name): Path<String>,
127 Query(query): Query<SubscribeQuery>,
128) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, SeamError> {
129 let sub = state
130 .subscriptions
131 .get(&name)
132 .ok_or_else(|| SeamError::not_found(format!("Subscription '{name}' not found")))?;
133
134 let raw_input = match &query.input {
135 Some(s) => serde_json::from_str(s).map_err(|e| SeamError::validation(e.to_string()))?,
136 None => serde_json::Value::Object(serde_json::Map::new()),
137 };
138
139 let data_stream = (sub.handler)(raw_input).await?;
140
141 let event_stream = data_stream
142 .map(|item| match item {
143 Ok(value) => {
144 let data = serde_json::to_string(&value).unwrap_or_default();
145 Ok(Event::default().event("data").data(data))
146 }
147 Err(e) => {
148 let payload = serde_json::json!({ "code": e.to_string().split(':').next().unwrap_or("INTERNAL_ERROR"), "message": e.to_string() });
149 Ok(Event::default().event("error").data(payload.to_string()))
150 }
151 })
152 .chain(tokio_stream::once(Ok(Event::default().event("complete").data("{}"))));
153
154 Ok(Sse::new(event_stream))
155}
156
157async fn handle_page(
158 State(state): State<Arc<AppState>>,
159 matched: MatchedPath,
160 Path(params): Path<HashMap<String, String>>,
161) -> Result<Html<String>, SeamError> {
162 let route_pattern = matched.as_str().to_string();
163 let page =
164 state.pages.get(&route_pattern).ok_or_else(|| SeamError::not_found("Page not found"))?;
165
166 let mut join_set = JoinSet::new();
167
168 let handlers = state.handlers.clone();
169 for loader in &page.loaders {
170 let input = (loader.input_fn)(¶ms);
171 let proc_name = loader.procedure.clone();
172 let data_key = loader.data_key.clone();
173 let handlers = handlers.clone();
174
175 join_set.spawn(async move {
176 let proc = handlers
177 .get(&proc_name)
178 .ok_or_else(|| SeamError::internal(format!("Procedure '{proc_name}' not found")))?;
179 let result = (proc.handler)(input).await?;
180 Ok::<(String, serde_json::Value), SeamError>((data_key, result))
181 });
182 }
183
184 let mut data = serde_json::Map::new();
185 while let Some(result) = join_set.join_next().await {
186 let (key, value) = result
187 .map_err(|e| SeamError::internal(e.to_string()))? ?; data.insert(key, value);
190 }
191
192 let html = injector::inject(&page.template, &serde_json::Value::Object(data));
193 Ok(Html(html))
194}