Skip to main content

seam_server/
server.rs

1/* packages/server/core/rust/src/server.rs */
2
3use std::collections::HashMap;
4use std::convert::Infallible;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use axum::extract::{MatchedPath, Path, Query, State};
9use axum::http::StatusCode;
10use axum::response::sse::{Event, Sse};
11use axum::response::{Html, IntoResponse, Response};
12use axum::routing::{get, post};
13use axum::Router;
14use futures_core::Stream;
15use tokio::net::TcpListener;
16use tokio::task::JoinSet;
17use tokio_stream::StreamExt;
18
19use crate::errors::SeamError;
20use crate::manifest::build_manifest;
21use crate::page::PageDef;
22use crate::procedure::{ProcedureDef, SubscriptionDef};
23
24struct AppState {
25  manifest_json: serde_json::Value,
26  handlers: HashMap<String, Arc<ProcedureDef>>,
27  subscriptions: HashMap<String, Arc<SubscriptionDef>>,
28  pages: HashMap<String, Arc<PageDef>>,
29}
30
31pub struct SeamServer {
32  procedures: Vec<ProcedureDef>,
33  subscriptions: Vec<SubscriptionDef>,
34  pages: Vec<PageDef>,
35}
36
37impl SeamServer {
38  pub fn new() -> Self {
39    Self { procedures: Vec::new(), subscriptions: Vec::new(), pages: Vec::new() }
40  }
41
42  pub fn procedure(mut self, proc: ProcedureDef) -> Self {
43    self.procedures.push(proc);
44    self
45  }
46
47  pub fn subscription(mut self, sub: SubscriptionDef) -> Self {
48    self.subscriptions.push(sub);
49    self
50  }
51
52  pub fn page(mut self, page: PageDef) -> Self {
53    self.pages.push(page);
54    self
55  }
56
57  pub fn into_router(self) -> Router {
58    let manifest = build_manifest(&self.procedures, &self.subscriptions);
59    let manifest_json = serde_json::to_value(&manifest).expect("manifest serialization");
60
61    let handlers: HashMap<String, Arc<ProcedureDef>> =
62      self.procedures.into_iter().map(|p| (p.name.clone(), Arc::new(p))).collect();
63
64    let subscriptions: HashMap<String, Arc<SubscriptionDef>> =
65      self.subscriptions.into_iter().map(|s| (s.name.clone(), Arc::new(s))).collect();
66
67    let mut pages = HashMap::new();
68    let mut router = Router::new()
69      .route("/_seam/manifest.json", get(handle_manifest))
70      .route("/_seam/rpc/{name}", post(handle_rpc))
71      .route("/_seam/subscribe/{name}", get(handle_subscribe));
72
73    for page in self.pages {
74      let full_route = format!("/_seam/page{}", page.route);
75      pages.insert(full_route.clone(), Arc::new(page));
76      router = router.route(&full_route, get(handle_page));
77    }
78
79    let state = Arc::new(AppState { manifest_json, handlers, subscriptions, pages });
80
81    router.with_state(state)
82  }
83
84  pub async fn serve(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
85    let router = self.into_router();
86    let listener = TcpListener::bind(addr).await?;
87    let local_addr = listener.local_addr()?;
88    println!("Seam Rust backend running on http://localhost:{}", local_addr.port());
89    axum::serve(listener, router).await?;
90    Ok(())
91  }
92}
93
94impl Default for SeamServer {
95  fn default() -> Self {
96    Self::new()
97  }
98}
99
100impl IntoResponse for SeamError {
101  fn into_response(self) -> Response {
102    let status = StatusCode::from_u16(self.status()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
103    let body = serde_json::json!({
104      "error": {
105        "code": self.code(),
106        "message": self.message(),
107      }
108    });
109    (status, axum::Json(body)).into_response()
110  }
111}
112
113async fn handle_manifest(State(state): State<Arc<AppState>>) -> impl IntoResponse {
114  axum::Json(state.manifest_json.clone())
115}
116
117async fn handle_rpc(
118  State(state): State<Arc<AppState>>,
119  Path(name): Path<String>,
120  body: axum::body::Bytes,
121) -> Result<impl IntoResponse, SeamError> {
122  let proc = state
123    .handlers
124    .get(&name)
125    .ok_or_else(|| SeamError::not_found(format!("Procedure '{name}' not found")))?;
126
127  let input: serde_json::Value =
128    serde_json::from_slice(&body).map_err(|e| SeamError::validation(e.to_string()))?;
129
130  let result = (proc.handler)(input).await?;
131  Ok(axum::Json(result))
132}
133
134#[derive(serde::Deserialize)]
135struct SubscribeQuery {
136  input: Option<String>,
137}
138
139async fn handle_subscribe(
140  State(state): State<Arc<AppState>>,
141  Path(name): Path<String>,
142  Query(query): Query<SubscribeQuery>,
143) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, Infallible>> + Send>>> {
144  let setup = async {
145    let sub = state
146      .subscriptions
147      .get(&name)
148      .ok_or_else(|| SeamError::not_found(format!("Subscription '{name}' not found")))?;
149
150    let raw_input = match &query.input {
151      Some(s) => serde_json::from_str(s).map_err(|e| SeamError::validation(e.to_string()))?,
152      None => serde_json::Value::Object(serde_json::Map::new()),
153    };
154
155    let data_stream = (sub.handler)(raw_input).await?;
156    Ok::<_, SeamError>(data_stream)
157  };
158
159  match setup.await {
160    Ok(data_stream) => {
161      let event_stream = data_stream
162        .map(|item| match item {
163          Ok(value) => {
164            let data = serde_json::to_string(&value).unwrap_or_default();
165            Ok(Event::default().event("data").data(data))
166          }
167          Err(e) => {
168            let payload = serde_json::json!({ "code": e.code(), "message": e.message() });
169            Ok(Event::default().event("error").data(payload.to_string()))
170          }
171        })
172        .chain(tokio_stream::once(Ok(Event::default().event("complete").data("{}"))));
173      Sse::new(Box::pin(event_stream))
174    }
175    Err(err) => {
176      let payload = serde_json::json!({ "code": err.code(), "message": err.message() });
177      let error_event = Event::default().event("error").data(payload.to_string());
178      Sse::new(Box::pin(tokio_stream::once(Ok(error_event))))
179    }
180  }
181}
182
183async fn handle_page(
184  State(state): State<Arc<AppState>>,
185  matched: MatchedPath,
186  Path(params): Path<HashMap<String, String>>,
187) -> Result<Html<String>, SeamError> {
188  let route_pattern = matched.as_str().to_string();
189  let page =
190    state.pages.get(&route_pattern).ok_or_else(|| SeamError::not_found("Page not found"))?;
191
192  let mut join_set = JoinSet::new();
193
194  let handlers = state.handlers.clone();
195  for loader in &page.loaders {
196    let input = (loader.input_fn)(&params);
197    let proc_name = loader.procedure.clone();
198    let data_key = loader.data_key.clone();
199    let handlers = handlers.clone();
200
201    join_set.spawn(async move {
202      let proc = handlers
203        .get(&proc_name)
204        .ok_or_else(|| SeamError::internal(format!("Procedure '{proc_name}' not found")))?;
205      let result = (proc.handler)(input).await?;
206      Ok::<(String, serde_json::Value), SeamError>((data_key, result))
207    });
208  }
209
210  let mut data = serde_json::Map::new();
211  while let Some(result) = join_set.join_next().await {
212    let (key, value) = result
213      .map_err(|e| SeamError::internal(e.to_string()))? // JoinError -> Internal (task panic)
214      ?; // SeamError propagates unchanged
215    data.insert(key, value);
216  }
217
218  let html = seam_injector::inject(&page.template, &serde_json::Value::Object(data));
219  Ok(Html(html))
220}