1use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
8use async_trait::async_trait;
9use axum::{
10 extract::{Path, State},
11 http::StatusCode,
12 response::{IntoResponse, Json},
13 routing::get,
14 Router,
15};
16use clasp_core::{Message, PublishMessage, SetMessage, SignalType, Value};
17use parking_lot::Mutex;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tower_http::cors::{Any, CorsLayer};
24use tower_http::trace::TraceLayer;
25use tracing::{debug, error, info};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29#[serde(rename_all = "UPPERCASE")]
30pub enum HttpMethod {
31 #[default]
32 GET,
33 POST,
34 PUT,
35 DELETE,
36 PATCH,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
41#[serde(rename_all = "lowercase")]
42pub enum HttpMode {
43 #[default]
45 Server,
46 Client,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct EndpointConfig {
53 pub path: String,
55 #[serde(default)]
57 pub method: HttpMethod,
58 pub clasp_address: String,
60 #[serde(default)]
62 pub description: Option<String>,
63 #[serde(default = "endpoint_enabled_default")]
65 pub enabled: bool,
66 #[serde(default)]
68 pub required_scope: Option<String>,
69 #[serde(default)]
71 pub rate_limit: u32,
72}
73
74fn endpoint_enabled_default() -> bool {
75 true
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct HttpBridgeConfig {
81 #[serde(default)]
83 pub mode: HttpMode,
84 pub url: String,
87 #[serde(default)]
89 pub endpoints: Vec<EndpointConfig>,
90 #[serde(default = "default_true")]
92 pub cors_enabled: bool,
93 #[serde(default)]
95 pub cors_origins: Vec<String>,
96 #[serde(default = "default_base_path")]
98 pub base_path: String,
99 #[serde(default = "default_timeout")]
101 pub timeout_secs: u32,
102 #[serde(default = "default_namespace")]
104 pub namespace: String,
105 #[serde(default)]
107 pub poll_interval_ms: u64,
108 #[serde(default)]
110 pub poll_endpoints: Vec<String>,
111}
112
113fn default_true() -> bool {
114 true
115}
116
117fn default_base_path() -> String {
118 "/api".to_string()
119}
120
121fn default_timeout() -> u32 {
122 30
123}
124
125fn default_namespace() -> String {
126 "/http".to_string()
127}
128
129impl Default for HttpBridgeConfig {
130 fn default() -> Self {
131 Self {
132 mode: HttpMode::Server,
133 url: "0.0.0.0:3000".to_string(),
134 endpoints: vec![
135 EndpointConfig {
136 path: "/signals".to_string(),
137 method: HttpMethod::GET,
138 clasp_address: "/**".to_string(),
139 description: Some("List all signals".to_string()),
140 enabled: true,
141 required_scope: None,
142 rate_limit: 0,
143 },
144 EndpointConfig {
145 path: "/signals/*path".to_string(),
146 method: HttpMethod::GET,
147 clasp_address: "/{path}".to_string(),
148 description: Some("Get signal value".to_string()),
149 enabled: true,
150 required_scope: None,
151 rate_limit: 0,
152 },
153 EndpointConfig {
154 path: "/signals/*path".to_string(),
155 method: HttpMethod::PUT,
156 clasp_address: "/{path}".to_string(),
157 description: Some("Set signal value".to_string()),
158 enabled: true,
159 required_scope: Some("write".to_string()),
160 rate_limit: 0,
161 },
162 EndpointConfig {
163 path: "/signals/*path".to_string(),
164 method: HttpMethod::POST,
165 clasp_address: "/{path}".to_string(),
166 description: Some("Publish event".to_string()),
167 enabled: true,
168 required_scope: Some("write".to_string()),
169 rate_limit: 0,
170 },
171 ],
172 cors_enabled: true,
173 cors_origins: vec![],
174 base_path: "/api".to_string(),
175 timeout_secs: 30,
176 namespace: "/http".to_string(),
177 poll_interval_ms: 0,
178 poll_endpoints: vec![],
179 }
180 }
181}
182
183#[derive(Clone)]
185struct AppState {
186 event_tx: mpsc::Sender<BridgeEvent>,
187 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
188 namespace: String,
189}
190
191pub struct HttpBridge {
193 config: BridgeConfig,
194 http_config: HttpBridgeConfig,
195 running: Arc<Mutex<bool>>,
196 shutdown_tx: Option<mpsc::Sender<()>>,
197 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
198}
199
200impl HttpBridge {
201 pub fn new(http_config: HttpBridgeConfig) -> Self {
203 let config = BridgeConfig {
204 name: "HTTP Bridge".to_string(),
205 protocol: "http".to_string(),
206 bidirectional: true,
207 ..Default::default()
208 };
209
210 Self {
211 config,
212 http_config,
213 running: Arc::new(Mutex::new(false)),
214 shutdown_tx: None,
215 signals: Arc::new(parking_lot::RwLock::new(HashMap::new())),
216 }
217 }
218
219 fn json_to_value(json: serde_json::Value) -> Value {
221 match json {
222 serde_json::Value::Null => Value::Null,
223 serde_json::Value::Bool(b) => Value::Bool(b),
224 serde_json::Value::Number(n) => {
225 if let Some(i) = n.as_i64() {
226 Value::Int(i)
227 } else if let Some(f) = n.as_f64() {
228 Value::Float(f)
229 } else {
230 Value::Null
231 }
232 }
233 serde_json::Value::String(s) => Value::String(s),
234 serde_json::Value::Array(arr) => {
235 Value::Array(arr.into_iter().map(Self::json_to_value).collect())
236 }
237 serde_json::Value::Object(obj) => {
238 let map: HashMap<String, Value> = obj
239 .into_iter()
240 .map(|(k, v)| (k, Self::json_to_value(v)))
241 .collect();
242 Value::Map(map)
243 }
244 }
245 }
246
247 fn value_to_json(value: &Value) -> serde_json::Value {
249 match value {
250 Value::Null => serde_json::Value::Null,
251 Value::Bool(b) => serde_json::Value::Bool(*b),
252 Value::Int(i) => serde_json::Value::Number((*i).into()),
253 Value::Float(f) => serde_json::Number::from_f64(*f)
254 .map(serde_json::Value::Number)
255 .unwrap_or(serde_json::Value::Null),
256 Value::String(s) => serde_json::Value::String(s.clone()),
257 Value::Bytes(b) => serde_json::Value::Array(
258 b.iter()
259 .map(|&x| serde_json::Value::Number(x.into()))
260 .collect(),
261 ),
262 Value::Array(arr) => {
263 serde_json::Value::Array(arr.iter().map(Self::value_to_json).collect())
264 }
265 Value::Map(m) => serde_json::Value::Object(
266 m.iter()
267 .map(|(k, v)| (k.clone(), Self::value_to_json(v)))
268 .collect(),
269 ),
270 }
271 }
272
273 fn build_router(state: AppState, base_path: &str) -> Router {
275 Router::new()
276 .route(&format!("{}/signals", base_path), get(list_signals))
277 .route(
278 &format!("{}/*path", base_path),
279 get(get_signal)
280 .put(set_signal)
281 .post(publish_event)
282 .delete(delete_signal),
283 )
284 .route(&format!("{}/health", base_path), get(health_check))
285 .layer(TraceLayer::new_for_http())
286 .with_state(state)
287 }
288
289 pub fn update_signal(&self, address: &str, value: Value) {
291 self.signals.write().insert(address.to_string(), value);
292 }
293}
294
295async fn health_check() -> impl IntoResponse {
298 Json(serde_json::json!({
299 "status": "ok",
300 "protocol": "CLASP",
301 "version": "0.1.0"
302 }))
303}
304
305async fn list_signals(State(state): State<AppState>) -> impl IntoResponse {
306 let signals = state.signals.read();
307 let list: Vec<serde_json::Value> = signals
308 .iter()
309 .map(|(addr, val)| {
310 serde_json::json!({
311 "address": addr,
312 "value": HttpBridge::value_to_json(val)
313 })
314 })
315 .collect();
316
317 Json(serde_json::json!({
318 "signals": list,
319 "count": list.len()
320 }))
321}
322
323async fn get_signal(State(state): State<AppState>, Path(path): Path<String>) -> impl IntoResponse {
324 let address = format!("/{}", path);
325 let signals = state.signals.read();
326
327 if let Some(value) = signals.get(&address) {
328 Json(serde_json::json!({
329 "address": address,
330 "value": HttpBridge::value_to_json(value)
331 }))
332 .into_response()
333 } else {
334 (
335 StatusCode::NOT_FOUND,
336 Json(serde_json::json!({
337 "error": "Signal not found",
338 "address": address
339 })),
340 )
341 .into_response()
342 }
343}
344
345async fn set_signal(
346 State(state): State<AppState>,
347 Path(path): Path<String>,
348 Json(body): Json<serde_json::Value>,
349) -> impl IntoResponse {
350 let address = format!("{}/{}", state.namespace, path);
351
352 let value = if let Some(v) = body.get("value") {
353 HttpBridge::json_to_value(v.clone())
354 } else {
355 HttpBridge::json_to_value(body)
356 };
357
358 state.signals.write().insert(address.clone(), value.clone());
360
361 let msg = Message::Set(SetMessage {
363 address: address.clone(),
364 value: value.clone(),
365 revision: None,
366 lock: false,
367 unlock: false,
368 ttl: None,
369 });
370
371 if let Err(e) = state
372 .event_tx
373 .send(BridgeEvent::ToClasp(Box::new(msg)))
374 .await
375 {
376 error!("Failed to send set event: {}", e);
377 return (
378 StatusCode::INTERNAL_SERVER_ERROR,
379 Json(serde_json::json!({ "error": "Internal error" })),
380 )
381 .into_response();
382 }
383
384 Json(serde_json::json!({
385 "address": address,
386 "value": HttpBridge::value_to_json(&value),
387 "status": "set"
388 }))
389 .into_response()
390}
391
392async fn publish_event(
393 State(state): State<AppState>,
394 Path(path): Path<String>,
395 Json(body): Json<serde_json::Value>,
396) -> impl IntoResponse {
397 let address = format!("{}/{}", state.namespace, path);
398
399 let value = if let Some(v) = body.get("value") {
400 HttpBridge::json_to_value(v.clone())
401 } else {
402 HttpBridge::json_to_value(body)
403 };
404
405 let msg = Message::Publish(PublishMessage {
407 address: address.clone(),
408 signal: Some(SignalType::Event),
409 value: Some(value.clone()),
410 payload: None,
411 samples: None,
412 rate: None,
413 id: None,
414 phase: None,
415 timestamp: None,
416 timeline: None,
417 });
418
419 if let Err(e) = state
420 .event_tx
421 .send(BridgeEvent::ToClasp(Box::new(msg)))
422 .await
423 {
424 error!("Failed to send publish event: {}", e);
425 return (
426 StatusCode::INTERNAL_SERVER_ERROR,
427 Json(serde_json::json!({ "error": "Internal error" })),
428 )
429 .into_response();
430 }
431
432 Json(serde_json::json!({
433 "address": address,
434 "value": HttpBridge::value_to_json(&value),
435 "status": "published"
436 }))
437 .into_response()
438}
439
440async fn delete_signal(
441 State(state): State<AppState>,
442 Path(path): Path<String>,
443) -> impl IntoResponse {
444 let address = format!("{}/{}", state.namespace, path);
445
446 let removed = state.signals.write().remove(&address);
448
449 if removed.is_some() {
450 let msg = Message::Set(SetMessage {
452 address: address.clone(),
453 value: Value::Null,
454 revision: None,
455 lock: false,
456 unlock: false,
457 ttl: None,
458 });
459
460 let _ = state
461 .event_tx
462 .send(BridgeEvent::ToClasp(Box::new(msg)))
463 .await;
464
465 Json(serde_json::json!({
466 "address": address,
467 "status": "deleted"
468 }))
469 .into_response()
470 } else {
471 (
472 StatusCode::NOT_FOUND,
473 Json(serde_json::json!({
474 "error": "Signal not found",
475 "address": address
476 })),
477 )
478 .into_response()
479 }
480}
481
482#[async_trait]
483impl Bridge for HttpBridge {
484 fn config(&self) -> &BridgeConfig {
485 &self.config
486 }
487
488 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
489 if *self.running.lock() {
490 return Err(BridgeError::Other("Bridge already running".to_string()));
491 }
492
493 let (tx, rx) = mpsc::channel(100);
494 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
495 self.shutdown_tx = Some(shutdown_tx);
496
497 match self.http_config.mode {
498 HttpMode::Server => {
499 let addr: SocketAddr = self
500 .http_config
501 .url
502 .parse()
503 .map_err(|e| BridgeError::Other(format!("Invalid address: {}", e)))?;
504
505 let app_state = AppState {
506 event_tx: tx.clone(),
507 signals: self.signals.clone(),
508 namespace: self.http_config.namespace.clone(),
509 };
510
511 let mut router = Self::build_router(app_state, &self.http_config.base_path);
512
513 if self.http_config.cors_enabled {
515 let cors = CorsLayer::new()
516 .allow_origin(Any)
517 .allow_methods(Any)
518 .allow_headers(Any);
519 router = router.layer(cors);
520 }
521
522 let running = self.running.clone();
523 let tx_clone = tx.clone();
524
525 tokio::spawn(async move {
526 let listener = match tokio::net::TcpListener::bind(addr).await {
527 Ok(l) => l,
528 Err(e) => {
529 error!("Failed to bind HTTP server: {}", e);
530 let _ = tx_clone
531 .send(BridgeEvent::Error(format!("Bind failed: {}", e)))
532 .await;
533 return;
534 }
535 };
536
537 info!("HTTP server listening on {}", addr);
538 *running.lock() = true;
539 let _ = tx_clone.send(BridgeEvent::Connected).await;
540
541 axum::serve(listener, router)
542 .with_graceful_shutdown(async move {
543 let _ = shutdown_rx.recv().await;
544 })
545 .await
546 .ok();
547
548 *running.lock() = false;
549 let _ = tx_clone
550 .send(BridgeEvent::Disconnected {
551 reason: Some("Server stopped".to_string()),
552 })
553 .await;
554 info!("HTTP server stopped");
555 });
556
557 *self.running.lock() = true;
558 info!(
559 "HTTP bridge started in server mode on {}",
560 self.http_config.url
561 );
562 }
563 HttpMode::Client => {
564 *self.running.lock() = true;
566 let _ = tx.send(BridgeEvent::Connected).await;
567
568 if self.http_config.poll_interval_ms > 0 {
570 let poll_interval =
571 std::time::Duration::from_millis(self.http_config.poll_interval_ms);
572 let base_url = self.http_config.url.clone();
573 let timeout_secs = self.http_config.timeout_secs;
574 let namespace = self.http_config.namespace.clone();
575 let poll_endpoints = if self.http_config.poll_endpoints.is_empty() {
576 vec!["/api/signals".to_string()]
577 } else {
578 self.http_config.poll_endpoints.clone()
579 };
580 let signals = self.signals.clone();
581 let running = self.running.clone();
582 let tx_clone = tx.clone();
583
584 tokio::spawn(async move {
585 let client = match reqwest::Client::builder()
586 .timeout(std::time::Duration::from_secs(timeout_secs as u64))
587 .build()
588 {
589 Ok(c) => c,
590 Err(e) => {
591 error!("Failed to create HTTP client: {}", e);
592 return;
593 }
594 };
595
596 let mut interval = tokio::time::interval(poll_interval);
597 info!(
598 "HTTP polling started with interval {}ms",
599 poll_interval.as_millis()
600 );
601
602 loop {
603 interval.tick().await;
604
605 if !*running.lock() {
606 break;
607 }
608
609 for endpoint in &poll_endpoints {
610 let url = format!("{}{}", base_url, endpoint);
611
612 match client.get(&url).send().await {
613 Ok(response) => {
614 if response.status().is_success() {
615 if let Ok(json) =
616 response.json::<serde_json::Value>().await
617 {
618 if let Some(signals_arr) =
620 json.get("signals").and_then(|s| s.as_array())
621 {
622 for signal in signals_arr {
623 if let (Some(addr), Some(val)) = (
624 signal
625 .get("address")
626 .and_then(|a| a.as_str()),
627 signal.get("value"),
628 ) {
629 let clasp_addr =
630 format!("{}{}", namespace, addr);
631 let value = HttpBridge::json_to_value(
632 val.clone(),
633 );
634
635 let changed = {
637 let current = signals.read();
638 current.get(&clasp_addr)
639 != Some(&value)
640 };
641
642 if changed {
643 signals.write().insert(
644 clasp_addr.clone(),
645 value.clone(),
646 );
647
648 let msg =
649 Message::Set(SetMessage {
650 address: clasp_addr,
651 value,
652 revision: None,
653 lock: false,
654 unlock: false,
655 ttl: None,
656 });
657
658 if let Err(e) = tx_clone
659 .send(BridgeEvent::ToClasp(
660 Box::new(msg),
661 ))
662 .await
663 {
664 debug!("Failed to send polled data: {}", e);
665 }
666 }
667 }
668 }
669 } else if let Some(value) = json.get("value") {
670 if let Some(addr) =
672 json.get("address").and_then(|a| a.as_str())
673 {
674 let clasp_addr =
675 format!("{}{}", namespace, addr);
676 let value = HttpBridge::json_to_value(
677 value.clone(),
678 );
679
680 let changed = {
681 let current = signals.read();
682 current.get(&clasp_addr) != Some(&value)
683 };
684
685 if changed {
686 signals.write().insert(
687 clasp_addr.clone(),
688 value.clone(),
689 );
690
691 let msg = Message::Set(SetMessage {
692 address: clasp_addr,
693 value,
694 revision: None,
695 lock: false,
696 unlock: false,
697 ttl: None,
698 });
699
700 let _ = tx_clone
701 .send(BridgeEvent::ToClasp(
702 Box::new(msg),
703 ))
704 .await;
705 }
706 }
707 }
708 }
709 }
710 }
711 Err(e) => {
712 debug!("HTTP poll error for {}: {}", url, e);
713 }
714 }
715 }
716 }
717
718 info!("HTTP polling stopped");
719 });
720 }
721
722 info!(
723 "HTTP bridge started in client mode for {}",
724 self.http_config.url
725 );
726 }
727 }
728
729 Ok(rx)
730 }
731
732 async fn stop(&mut self) -> Result<()> {
733 *self.running.lock() = false;
734 if let Some(tx) = self.shutdown_tx.take() {
735 let _ = tx.send(()).await;
736 }
737 info!("HTTP bridge stopped");
738 Ok(())
739 }
740
741 async fn send(&self, msg: Message) -> Result<()> {
742 if !*self.running.lock() {
743 return Err(BridgeError::Other("Not connected".to_string()));
744 }
745
746 match self.http_config.mode {
747 HttpMode::Server => {
748 match &msg {
750 Message::Set(set) => {
751 self.signals
752 .write()
753 .insert(set.address.clone(), set.value.clone());
754 }
755 Message::Publish(pub_msg) => {
756 if let Some(value) = &pub_msg.value {
757 self.signals
758 .write()
759 .insert(pub_msg.address.clone(), value.clone());
760 }
761 }
762 _ => {}
763 }
764 debug!("HTTP server cached CLASP message");
765 Ok(())
766 }
767 HttpMode::Client => {
768 let (address, value, method) = match &msg {
770 Message::Set(set) => (&set.address, &set.value, HttpMethod::PUT),
771 Message::Publish(pub_msg) => {
772 if let Some(val) = &pub_msg.value {
773 (&pub_msg.address, val, HttpMethod::POST)
774 } else {
775 return Ok(());
776 }
777 }
778 _ => return Ok(()),
779 };
780
781 let url = format!("{}{}", self.http_config.url, address);
782 let body = Self::value_to_json(value);
783
784 let client = reqwest::Client::builder()
785 .timeout(std::time::Duration::from_secs(
786 self.http_config.timeout_secs as u64,
787 ))
788 .build()
789 .map_err(|e| BridgeError::Other(format!("HTTP client error: {}", e)))?;
790
791 let request = match method {
792 HttpMethod::GET => client.get(&url),
793 HttpMethod::POST => client.post(&url).json(&body),
794 HttpMethod::PUT => client.put(&url).json(&body),
795 HttpMethod::DELETE => client.delete(&url),
796 HttpMethod::PATCH => client.patch(&url).json(&body),
797 };
798
799 let response = request
800 .send()
801 .await
802 .map_err(|e| BridgeError::Other(format!("HTTP request failed: {}", e)))?;
803
804 debug!("HTTP {} {} -> {}", method, url, response.status());
805 Ok(())
806 }
807 }
808 }
809
810 fn is_running(&self) -> bool {
811 *self.running.lock()
812 }
813
814 fn namespace(&self) -> &str {
815 &self.http_config.namespace
816 }
817}
818
819impl std::fmt::Display for HttpMethod {
820 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
821 match self {
822 HttpMethod::GET => write!(f, "GET"),
823 HttpMethod::POST => write!(f, "POST"),
824 HttpMethod::PUT => write!(f, "PUT"),
825 HttpMethod::DELETE => write!(f, "DELETE"),
826 HttpMethod::PATCH => write!(f, "PATCH"),
827 }
828 }
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834
835 #[test]
836 fn test_config_default() {
837 let config = HttpBridgeConfig::default();
838 assert!(config.cors_enabled);
839 assert!(!config.endpoints.is_empty());
840 }
841
842 #[test]
843 fn test_value_conversion() {
844 let json = serde_json::json!({
845 "intensity": 0.75,
846 "enabled": true,
847 "name": "main light"
848 });
849
850 let value = HttpBridge::json_to_value(json.clone());
851 let back = HttpBridge::value_to_json(&value);
852
853 assert_eq!(json, back);
854 }
855}