Skip to main content

seam_server/
server.rs

1/* packages/server/core/rust/src/server.rs */
2
3use std::collections::HashMap;
4use std::convert::Infallible;
5use std::sync::Arc;
6
7use axum::extract::{MatchedPath, Path, Query, State};
8use axum::response::sse::{Event, Sse};
9use axum::response::{Html, IntoResponse};
10use axum::routing::{get, post};
11use axum::Router;
12use futures_core::Stream;
13use tokio::net::TcpListener;
14use tokio::task::JoinSet;
15use tokio_stream::StreamExt;
16
17use crate::errors::SeamError;
18use crate::injector;
19use crate::manifest::build_manifest;
20use crate::page::PageDef;
21use crate::procedure::{ProcedureDef, SubscriptionDef};
22
23struct AppState {
24  manifest_json: serde_json::Value,
25  handlers: HashMap<String, Arc<ProcedureDef>>,
26  subscriptions: HashMap<String, Arc<SubscriptionDef>>,
27  pages: HashMap<String, Arc<PageDef>>,
28}
29
30pub struct SeamServer {
31  procedures: Vec<ProcedureDef>,
32  subscriptions: Vec<SubscriptionDef>,
33  pages: Vec<PageDef>,
34}
35
36impl SeamServer {
37  pub fn new() -> Self {
38    Self { procedures: Vec::new(), subscriptions: Vec::new(), pages: Vec::new() }
39  }
40
41  pub fn procedure(mut self, proc: ProcedureDef) -> Self {
42    self.procedures.push(proc);
43    self
44  }
45
46  pub fn subscription(mut self, sub: SubscriptionDef) -> Self {
47    self.subscriptions.push(sub);
48    self
49  }
50
51  pub fn page(mut self, page: PageDef) -> Self {
52    self.pages.push(page);
53    self
54  }
55
56  pub fn into_router(self) -> Router {
57    let manifest = build_manifest(&self.procedures, &self.subscriptions);
58    let manifest_json = serde_json::to_value(&manifest).expect("manifest serialization");
59
60    let handlers: HashMap<String, Arc<ProcedureDef>> =
61      self.procedures.into_iter().map(|p| (p.name.clone(), Arc::new(p))).collect();
62
63    let subscriptions: HashMap<String, Arc<SubscriptionDef>> =
64      self.subscriptions.into_iter().map(|s| (s.name.clone(), Arc::new(s))).collect();
65
66    let mut pages = HashMap::new();
67    let mut router = Router::new()
68      .route("/_seam/manifest.json", get(handle_manifest))
69      .route("/_seam/rpc/{name}", post(handle_rpc))
70      .route("/_seam/subscribe/{name}", get(handle_subscribe));
71
72    for page in self.pages {
73      let full_route = format!("/_seam/page{}", page.route);
74      pages.insert(full_route.clone(), Arc::new(page));
75      router = router.route(&full_route, get(handle_page));
76    }
77
78    let state = Arc::new(AppState { manifest_json, handlers, subscriptions, pages });
79
80    router.with_state(state)
81  }
82
83  pub async fn serve(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
84    let router = self.into_router();
85    let listener = TcpListener::bind(addr).await?;
86    println!("Seam Rust backend running on http://{addr}");
87    axum::serve(listener, router).await?;
88    Ok(())
89  }
90}
91
92impl Default for SeamServer {
93  fn default() -> Self {
94    Self::new()
95  }
96}
97
98async fn handle_manifest(State(state): State<Arc<AppState>>) -> impl IntoResponse {
99  axum::Json(state.manifest_json.clone())
100}
101
102async fn handle_rpc(
103  State(state): State<Arc<AppState>>,
104  Path(name): Path<String>,
105  body: axum::body::Bytes,
106) -> Result<impl IntoResponse, SeamError> {
107  let proc = state
108    .handlers
109    .get(&name)
110    .ok_or_else(|| SeamError::not_found(format!("Procedure '{name}' not found")))?;
111
112  let input: serde_json::Value =
113    serde_json::from_slice(&body).map_err(|e| SeamError::validation(e.to_string()))?;
114
115  let result = (proc.handler)(input).await?;
116  Ok(axum::Json(result))
117}
118
119#[derive(serde::Deserialize)]
120struct SubscribeQuery {
121  input: Option<String>,
122}
123
124async fn handle_subscribe(
125  State(state): State<Arc<AppState>>,
126  Path(name): Path<String>,
127  Query(query): Query<SubscribeQuery>,
128) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, SeamError> {
129  let sub = state
130    .subscriptions
131    .get(&name)
132    .ok_or_else(|| SeamError::not_found(format!("Subscription '{name}' not found")))?;
133
134  let raw_input = match &query.input {
135    Some(s) => serde_json::from_str(s).map_err(|e| SeamError::validation(e.to_string()))?,
136    None => serde_json::Value::Object(serde_json::Map::new()),
137  };
138
139  let data_stream = (sub.handler)(raw_input).await?;
140
141  let event_stream = data_stream
142    .map(|item| match item {
143      Ok(value) => {
144        let data = serde_json::to_string(&value).unwrap_or_default();
145        Ok(Event::default().event("data").data(data))
146      }
147      Err(e) => {
148        let payload = serde_json::json!({ "code": e.to_string().split(':').next().unwrap_or("INTERNAL_ERROR"), "message": e.to_string() });
149        Ok(Event::default().event("error").data(payload.to_string()))
150      }
151    })
152    .chain(tokio_stream::once(Ok(Event::default().event("complete").data("{}"))));
153
154  Ok(Sse::new(event_stream))
155}
156
157async fn handle_page(
158  State(state): State<Arc<AppState>>,
159  matched: MatchedPath,
160  Path(params): Path<HashMap<String, String>>,
161) -> Result<Html<String>, SeamError> {
162  let route_pattern = matched.as_str().to_string();
163  let page =
164    state.pages.get(&route_pattern).ok_or_else(|| SeamError::not_found("Page not found"))?;
165
166  let mut join_set = JoinSet::new();
167
168  let handlers = state.handlers.clone();
169  for loader in &page.loaders {
170    let input = (loader.input_fn)(&params);
171    let proc_name = loader.procedure.clone();
172    let data_key = loader.data_key.clone();
173    let handlers = handlers.clone();
174
175    join_set.spawn(async move {
176      let proc = handlers
177        .get(&proc_name)
178        .ok_or_else(|| SeamError::internal(format!("Procedure '{proc_name}' not found")))?;
179      let result = (proc.handler)(input).await?;
180      Ok::<(String, serde_json::Value), SeamError>((data_key, result))
181    });
182  }
183
184  let mut data = serde_json::Map::new();
185  while let Some(result) = join_set.join_next().await {
186    let (key, value) = result
187      .map_err(|e| SeamError::internal(e.to_string()))? // JoinError -> Internal (task panic)
188      ?; // SeamError propagates unchanged
189    data.insert(key, value);
190  }
191
192  let html = injector::inject(&page.template, &serde_json::Value::Object(data));
193  Ok(Html(html))
194}