1use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
8use async_trait::async_trait;
9use axum::{
10 extract::{Path, State},
11 http::StatusCode,
12 response::{IntoResponse, Json},
13 routing::get,
14 Router,
15};
16use clasp_core::{Message, PublishMessage, SetMessage, SignalType, Value};
17use parking_lot::Mutex;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tower_http::cors::{Any, CorsLayer};
24use tower_http::trace::TraceLayer;
25use tracing::{debug, error, info};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29#[serde(rename_all = "UPPERCASE")]
30pub enum HttpMethod {
31 #[default]
32 GET,
33 POST,
34 PUT,
35 DELETE,
36 PATCH,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
41#[serde(rename_all = "lowercase")]
42pub enum HttpMode {
43 #[default]
45 Server,
46 Client,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct EndpointConfig {
53 pub path: String,
55 #[serde(default)]
57 pub method: HttpMethod,
58 pub clasp_address: String,
60 #[serde(default)]
62 pub description: Option<String>,
63 #[serde(default = "endpoint_enabled_default")]
65 pub enabled: bool,
66 #[serde(default)]
68 pub required_scope: Option<String>,
69 #[serde(default)]
71 pub rate_limit: u32,
72}
73
74fn endpoint_enabled_default() -> bool {
75 true
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct HttpBridgeConfig {
81 #[serde(default)]
83 pub mode: HttpMode,
84 pub url: String,
87 #[serde(default)]
89 pub endpoints: Vec<EndpointConfig>,
90 #[serde(default = "default_true")]
92 pub cors_enabled: bool,
93 #[serde(default)]
95 pub cors_origins: Vec<String>,
96 #[serde(default = "default_base_path")]
98 pub base_path: String,
99 #[serde(default = "default_timeout")]
101 pub timeout_secs: u32,
102 #[serde(default = "default_namespace")]
104 pub namespace: String,
105 #[serde(default)]
107 pub poll_interval_ms: u64,
108 #[serde(default)]
110 pub poll_endpoints: Vec<String>,
111}
112
113fn default_true() -> bool {
114 true
115}
116
117fn default_base_path() -> String {
118 "/api".to_string()
119}
120
121fn default_timeout() -> u32 {
122 30
123}
124
125fn default_namespace() -> String {
126 "/http".to_string()
127}
128
129impl Default for HttpBridgeConfig {
130 fn default() -> Self {
131 Self {
132 mode: HttpMode::Server,
133 url: "0.0.0.0:3000".to_string(),
134 endpoints: vec![
135 EndpointConfig {
136 path: "/signals".to_string(),
137 method: HttpMethod::GET,
138 clasp_address: "/**".to_string(),
139 description: Some("List all signals".to_string()),
140 enabled: true,
141 required_scope: None,
142 rate_limit: 0,
143 },
144 EndpointConfig {
145 path: "/signals/*path".to_string(),
146 method: HttpMethod::GET,
147 clasp_address: "/{path}".to_string(),
148 description: Some("Get signal value".to_string()),
149 enabled: true,
150 required_scope: None,
151 rate_limit: 0,
152 },
153 EndpointConfig {
154 path: "/signals/*path".to_string(),
155 method: HttpMethod::PUT,
156 clasp_address: "/{path}".to_string(),
157 description: Some("Set signal value".to_string()),
158 enabled: true,
159 required_scope: Some("write".to_string()),
160 rate_limit: 0,
161 },
162 EndpointConfig {
163 path: "/signals/*path".to_string(),
164 method: HttpMethod::POST,
165 clasp_address: "/{path}".to_string(),
166 description: Some("Publish event".to_string()),
167 enabled: true,
168 required_scope: Some("write".to_string()),
169 rate_limit: 0,
170 },
171 ],
172 cors_enabled: true,
173 cors_origins: vec![],
174 base_path: "/api".to_string(),
175 timeout_secs: 30,
176 namespace: "/http".to_string(),
177 poll_interval_ms: 0,
178 poll_endpoints: vec![],
179 }
180 }
181}
182
183#[derive(Clone)]
185struct AppState {
186 event_tx: mpsc::Sender<BridgeEvent>,
187 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
188 namespace: String,
189}
190
191pub struct HttpBridge {
193 config: BridgeConfig,
194 http_config: HttpBridgeConfig,
195 running: Arc<Mutex<bool>>,
196 shutdown_tx: Option<mpsc::Sender<()>>,
197 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
198}
199
200impl HttpBridge {
201 pub fn new(http_config: HttpBridgeConfig) -> Self {
203 let config = BridgeConfig {
204 name: "HTTP Bridge".to_string(),
205 protocol: "http".to_string(),
206 bidirectional: true,
207 ..Default::default()
208 };
209
210 Self {
211 config,
212 http_config,
213 running: Arc::new(Mutex::new(false)),
214 shutdown_tx: None,
215 signals: Arc::new(parking_lot::RwLock::new(HashMap::new())),
216 }
217 }
218
219 fn json_to_value(json: serde_json::Value) -> Value {
221 match json {
222 serde_json::Value::Null => Value::Null,
223 serde_json::Value::Bool(b) => Value::Bool(b),
224 serde_json::Value::Number(n) => {
225 if let Some(i) = n.as_i64() {
226 Value::Int(i)
227 } else if let Some(f) = n.as_f64() {
228 Value::Float(f)
229 } else {
230 Value::Null
231 }
232 }
233 serde_json::Value::String(s) => Value::String(s),
234 serde_json::Value::Array(arr) => {
235 Value::Array(arr.into_iter().map(Self::json_to_value).collect())
236 }
237 serde_json::Value::Object(obj) => {
238 let map: HashMap<String, Value> = obj
239 .into_iter()
240 .map(|(k, v)| (k, Self::json_to_value(v)))
241 .collect();
242 Value::Map(map)
243 }
244 }
245 }
246
247 fn value_to_json(value: &Value) -> serde_json::Value {
249 match value {
250 Value::Null => serde_json::Value::Null,
251 Value::Bool(b) => serde_json::Value::Bool(*b),
252 Value::Int(i) => serde_json::Value::Number((*i).into()),
253 Value::Float(f) => serde_json::Number::from_f64(*f)
254 .map(serde_json::Value::Number)
255 .unwrap_or(serde_json::Value::Null),
256 Value::String(s) => serde_json::Value::String(s.clone()),
257 Value::Bytes(b) => serde_json::Value::Array(
258 b.iter()
259 .map(|&x| serde_json::Value::Number(x.into()))
260 .collect(),
261 ),
262 Value::Array(arr) => {
263 serde_json::Value::Array(arr.iter().map(Self::value_to_json).collect())
264 }
265 Value::Map(m) => serde_json::Value::Object(
266 m.iter()
267 .map(|(k, v)| (k.clone(), Self::value_to_json(v)))
268 .collect(),
269 ),
270 }
271 }
272
273 fn build_router(state: AppState, base_path: &str) -> Router {
275 Router::new()
276 .route(&format!("{}/signals", base_path), get(list_signals))
277 .route(
278 &format!("{}/*path", base_path),
279 get(get_signal)
280 .put(set_signal)
281 .post(publish_event)
282 .delete(delete_signal),
283 )
284 .route(&format!("{}/health", base_path), get(health_check))
285 .layer(TraceLayer::new_for_http())
286 .with_state(state)
287 }
288
289 pub fn update_signal(&self, address: &str, value: Value) {
291 self.signals.write().insert(address.to_string(), value);
292 }
293}
294
295async fn health_check() -> impl IntoResponse {
298 Json(serde_json::json!({
299 "status": "ok",
300 "protocol": "CLASP",
301 "version": "0.1.0"
302 }))
303}
304
305async fn list_signals(State(state): State<AppState>) -> impl IntoResponse {
306 let signals = state.signals.read();
307 let list: Vec<serde_json::Value> = signals
308 .iter()
309 .map(|(addr, val)| {
310 serde_json::json!({
311 "address": addr,
312 "value": HttpBridge::value_to_json(val)
313 })
314 })
315 .collect();
316
317 Json(serde_json::json!({
318 "signals": list,
319 "count": list.len()
320 }))
321}
322
323async fn get_signal(State(state): State<AppState>, Path(path): Path<String>) -> impl IntoResponse {
324 let address = format!("/{}", path);
325 let signals = state.signals.read();
326
327 if let Some(value) = signals.get(&address) {
328 Json(serde_json::json!({
329 "address": address,
330 "value": HttpBridge::value_to_json(value)
331 }))
332 .into_response()
333 } else {
334 (
335 StatusCode::NOT_FOUND,
336 Json(serde_json::json!({
337 "error": "Signal not found",
338 "address": address
339 })),
340 )
341 .into_response()
342 }
343}
344
345async fn set_signal(
346 State(state): State<AppState>,
347 Path(path): Path<String>,
348 Json(body): Json<serde_json::Value>,
349) -> impl IntoResponse {
350 let address = format!("{}/{}", state.namespace, path);
351
352 let value = if let Some(v) = body.get("value") {
353 HttpBridge::json_to_value(v.clone())
354 } else {
355 HttpBridge::json_to_value(body)
356 };
357
358 state.signals.write().insert(address.clone(), value.clone());
360
361 let msg = Message::Set(SetMessage {
363 address: address.clone(),
364 value: value.clone(),
365 revision: None,
366 lock: false,
367 unlock: false,
368 });
369
370 if let Err(e) = state
371 .event_tx
372 .send(BridgeEvent::ToClasp(Box::new(msg)))
373 .await
374 {
375 error!("Failed to send set event: {}", e);
376 return (
377 StatusCode::INTERNAL_SERVER_ERROR,
378 Json(serde_json::json!({ "error": "Internal error" })),
379 )
380 .into_response();
381 }
382
383 Json(serde_json::json!({
384 "address": address,
385 "value": HttpBridge::value_to_json(&value),
386 "status": "set"
387 }))
388 .into_response()
389}
390
391async fn publish_event(
392 State(state): State<AppState>,
393 Path(path): Path<String>,
394 Json(body): Json<serde_json::Value>,
395) -> impl IntoResponse {
396 let address = format!("{}/{}", state.namespace, path);
397
398 let value = if let Some(v) = body.get("value") {
399 HttpBridge::json_to_value(v.clone())
400 } else {
401 HttpBridge::json_to_value(body)
402 };
403
404 let msg = Message::Publish(PublishMessage {
406 address: address.clone(),
407 signal: Some(SignalType::Event),
408 value: Some(value.clone()),
409 payload: None,
410 samples: None,
411 rate: None,
412 id: None,
413 phase: None,
414 timestamp: None,
415 timeline: None,
416 });
417
418 if let Err(e) = state
419 .event_tx
420 .send(BridgeEvent::ToClasp(Box::new(msg)))
421 .await
422 {
423 error!("Failed to send publish event: {}", e);
424 return (
425 StatusCode::INTERNAL_SERVER_ERROR,
426 Json(serde_json::json!({ "error": "Internal error" })),
427 )
428 .into_response();
429 }
430
431 Json(serde_json::json!({
432 "address": address,
433 "value": HttpBridge::value_to_json(&value),
434 "status": "published"
435 }))
436 .into_response()
437}
438
439async fn delete_signal(
440 State(state): State<AppState>,
441 Path(path): Path<String>,
442) -> impl IntoResponse {
443 let address = format!("{}/{}", state.namespace, path);
444
445 let removed = state.signals.write().remove(&address);
447
448 if removed.is_some() {
449 let msg = Message::Set(SetMessage {
451 address: address.clone(),
452 value: Value::Null,
453 revision: None,
454 lock: false,
455 unlock: false,
456 });
457
458 let _ = state
459 .event_tx
460 .send(BridgeEvent::ToClasp(Box::new(msg)))
461 .await;
462
463 Json(serde_json::json!({
464 "address": address,
465 "status": "deleted"
466 }))
467 .into_response()
468 } else {
469 (
470 StatusCode::NOT_FOUND,
471 Json(serde_json::json!({
472 "error": "Signal not found",
473 "address": address
474 })),
475 )
476 .into_response()
477 }
478}
479
480#[async_trait]
481impl Bridge for HttpBridge {
482 fn config(&self) -> &BridgeConfig {
483 &self.config
484 }
485
486 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
487 if *self.running.lock() {
488 return Err(BridgeError::Other("Bridge already running".to_string()));
489 }
490
491 let (tx, rx) = mpsc::channel(100);
492 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
493 self.shutdown_tx = Some(shutdown_tx);
494
495 match self.http_config.mode {
496 HttpMode::Server => {
497 let addr: SocketAddr = self
498 .http_config
499 .url
500 .parse()
501 .map_err(|e| BridgeError::Other(format!("Invalid address: {}", e)))?;
502
503 let app_state = AppState {
504 event_tx: tx.clone(),
505 signals: self.signals.clone(),
506 namespace: self.http_config.namespace.clone(),
507 };
508
509 let mut router = Self::build_router(app_state, &self.http_config.base_path);
510
511 if self.http_config.cors_enabled {
513 let cors = CorsLayer::new()
514 .allow_origin(Any)
515 .allow_methods(Any)
516 .allow_headers(Any);
517 router = router.layer(cors);
518 }
519
520 let running = self.running.clone();
521 let tx_clone = tx.clone();
522
523 tokio::spawn(async move {
524 let listener = match tokio::net::TcpListener::bind(addr).await {
525 Ok(l) => l,
526 Err(e) => {
527 error!("Failed to bind HTTP server: {}", e);
528 let _ = tx_clone
529 .send(BridgeEvent::Error(format!("Bind failed: {}", e)))
530 .await;
531 return;
532 }
533 };
534
535 info!("HTTP server listening on {}", addr);
536 *running.lock() = true;
537 let _ = tx_clone.send(BridgeEvent::Connected).await;
538
539 axum::serve(listener, router)
540 .with_graceful_shutdown(async move {
541 let _ = shutdown_rx.recv().await;
542 })
543 .await
544 .ok();
545
546 *running.lock() = false;
547 let _ = tx_clone
548 .send(BridgeEvent::Disconnected {
549 reason: Some("Server stopped".to_string()),
550 })
551 .await;
552 info!("HTTP server stopped");
553 });
554
555 *self.running.lock() = true;
556 info!(
557 "HTTP bridge started in server mode on {}",
558 self.http_config.url
559 );
560 }
561 HttpMode::Client => {
562 *self.running.lock() = true;
564 let _ = tx.send(BridgeEvent::Connected).await;
565
566 if self.http_config.poll_interval_ms > 0 {
568 let poll_interval =
569 std::time::Duration::from_millis(self.http_config.poll_interval_ms);
570 let base_url = self.http_config.url.clone();
571 let timeout_secs = self.http_config.timeout_secs;
572 let namespace = self.http_config.namespace.clone();
573 let poll_endpoints = if self.http_config.poll_endpoints.is_empty() {
574 vec!["/api/signals".to_string()]
575 } else {
576 self.http_config.poll_endpoints.clone()
577 };
578 let signals = self.signals.clone();
579 let running = self.running.clone();
580 let tx_clone = tx.clone();
581
582 tokio::spawn(async move {
583 let client = match reqwest::Client::builder()
584 .timeout(std::time::Duration::from_secs(timeout_secs as u64))
585 .build()
586 {
587 Ok(c) => c,
588 Err(e) => {
589 error!("Failed to create HTTP client: {}", e);
590 return;
591 }
592 };
593
594 let mut interval = tokio::time::interval(poll_interval);
595 info!(
596 "HTTP polling started with interval {}ms",
597 poll_interval.as_millis()
598 );
599
600 loop {
601 interval.tick().await;
602
603 if !*running.lock() {
604 break;
605 }
606
607 for endpoint in &poll_endpoints {
608 let url = format!("{}{}", base_url, endpoint);
609
610 match client.get(&url).send().await {
611 Ok(response) => {
612 if response.status().is_success() {
613 if let Ok(json) =
614 response.json::<serde_json::Value>().await
615 {
616 if let Some(signals_arr) =
618 json.get("signals").and_then(|s| s.as_array())
619 {
620 for signal in signals_arr {
621 if let (Some(addr), Some(val)) = (
622 signal
623 .get("address")
624 .and_then(|a| a.as_str()),
625 signal.get("value"),
626 ) {
627 let clasp_addr =
628 format!("{}{}", namespace, addr);
629 let value = HttpBridge::json_to_value(
630 val.clone(),
631 );
632
633 let changed = {
635 let current = signals.read();
636 current.get(&clasp_addr)
637 != Some(&value)
638 };
639
640 if changed {
641 signals.write().insert(
642 clasp_addr.clone(),
643 value.clone(),
644 );
645
646 let msg =
647 Message::Set(SetMessage {
648 address: clasp_addr,
649 value,
650 revision: None,
651 lock: false,
652 unlock: false,
653 });
654
655 if let Err(e) = tx_clone
656 .send(BridgeEvent::ToClasp(
657 Box::new(msg),
658 ))
659 .await
660 {
661 debug!("Failed to send polled data: {}", e);
662 }
663 }
664 }
665 }
666 } else if let Some(value) = json.get("value") {
667 if let Some(addr) =
669 json.get("address").and_then(|a| a.as_str())
670 {
671 let clasp_addr =
672 format!("{}{}", namespace, addr);
673 let value = HttpBridge::json_to_value(
674 value.clone(),
675 );
676
677 let changed = {
678 let current = signals.read();
679 current.get(&clasp_addr) != Some(&value)
680 };
681
682 if changed {
683 signals.write().insert(
684 clasp_addr.clone(),
685 value.clone(),
686 );
687
688 let msg = Message::Set(SetMessage {
689 address: clasp_addr,
690 value,
691 revision: None,
692 lock: false,
693 unlock: false,
694 });
695
696 let _ = tx_clone
697 .send(BridgeEvent::ToClasp(
698 Box::new(msg),
699 ))
700 .await;
701 }
702 }
703 }
704 }
705 }
706 }
707 Err(e) => {
708 debug!("HTTP poll error for {}: {}", url, e);
709 }
710 }
711 }
712 }
713
714 info!("HTTP polling stopped");
715 });
716 }
717
718 info!(
719 "HTTP bridge started in client mode for {}",
720 self.http_config.url
721 );
722 }
723 }
724
725 Ok(rx)
726 }
727
728 async fn stop(&mut self) -> Result<()> {
729 *self.running.lock() = false;
730 if let Some(tx) = self.shutdown_tx.take() {
731 let _ = tx.send(()).await;
732 }
733 info!("HTTP bridge stopped");
734 Ok(())
735 }
736
737 async fn send(&self, msg: Message) -> Result<()> {
738 if !*self.running.lock() {
739 return Err(BridgeError::Other("Not connected".to_string()));
740 }
741
742 match self.http_config.mode {
743 HttpMode::Server => {
744 match &msg {
746 Message::Set(set) => {
747 self.signals
748 .write()
749 .insert(set.address.clone(), set.value.clone());
750 }
751 Message::Publish(pub_msg) => {
752 if let Some(value) = &pub_msg.value {
753 self.signals
754 .write()
755 .insert(pub_msg.address.clone(), value.clone());
756 }
757 }
758 _ => {}
759 }
760 debug!("HTTP server cached CLASP message");
761 Ok(())
762 }
763 HttpMode::Client => {
764 let (address, value, method) = match &msg {
766 Message::Set(set) => (&set.address, &set.value, HttpMethod::PUT),
767 Message::Publish(pub_msg) => {
768 if let Some(val) = &pub_msg.value {
769 (&pub_msg.address, val, HttpMethod::POST)
770 } else {
771 return Ok(());
772 }
773 }
774 _ => return Ok(()),
775 };
776
777 let url = format!("{}{}", self.http_config.url, address);
778 let body = Self::value_to_json(value);
779
780 let client = reqwest::Client::builder()
781 .timeout(std::time::Duration::from_secs(
782 self.http_config.timeout_secs as u64,
783 ))
784 .build()
785 .map_err(|e| BridgeError::Other(format!("HTTP client error: {}", e)))?;
786
787 let request = match method {
788 HttpMethod::GET => client.get(&url),
789 HttpMethod::POST => client.post(&url).json(&body),
790 HttpMethod::PUT => client.put(&url).json(&body),
791 HttpMethod::DELETE => client.delete(&url),
792 HttpMethod::PATCH => client.patch(&url).json(&body),
793 };
794
795 let response = request
796 .send()
797 .await
798 .map_err(|e| BridgeError::Other(format!("HTTP request failed: {}", e)))?;
799
800 debug!("HTTP {} {} -> {}", method, url, response.status());
801 Ok(())
802 }
803 }
804 }
805
806 fn is_running(&self) -> bool {
807 *self.running.lock()
808 }
809
810 fn namespace(&self) -> &str {
811 &self.http_config.namespace
812 }
813}
814
815impl std::fmt::Display for HttpMethod {
816 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
817 match self {
818 HttpMethod::GET => write!(f, "GET"),
819 HttpMethod::POST => write!(f, "POST"),
820 HttpMethod::PUT => write!(f, "PUT"),
821 HttpMethod::DELETE => write!(f, "DELETE"),
822 HttpMethod::PATCH => write!(f, "PATCH"),
823 }
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[test]
832 fn test_config_default() {
833 let config = HttpBridgeConfig::default();
834 assert!(config.cors_enabled);
835 assert!(!config.endpoints.is_empty());
836 }
837
838 #[test]
839 fn test_value_conversion() {
840 let json = serde_json::json!({
841 "intensity": 0.75,
842 "enabled": true,
843 "name": "main light"
844 });
845
846 let value = HttpBridge::json_to_value(json.clone());
847 let back = HttpBridge::value_to_json(&value);
848
849 assert_eq!(json, back);
850 }
851}