1use crate::{Bridge, BridgeConfig, BridgeError, BridgeEvent, Result};
8use async_trait::async_trait;
9use axum::{
10 extract::{Path, State},
11 http::StatusCode,
12 response::{IntoResponse, Json},
13 routing::{delete, get},
14 Router,
15};
16use clasp_core::{Message, PublishMessage, SetMessage, SignalType, Value};
17use parking_lot::Mutex;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::net::SocketAddr;
21use std::sync::Arc;
22use tokio::sync::mpsc;
23use tower_http::cors::{Any, CorsLayer};
24use tower_http::trace::TraceLayer;
25use tracing::{debug, error, info};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
29#[serde(rename_all = "UPPERCASE")]
30pub enum HttpMethod {
31 #[default]
32 GET,
33 POST,
34 PUT,
35 DELETE,
36 PATCH,
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
41#[serde(rename_all = "lowercase")]
42pub enum HttpMode {
43 #[default]
45 Server,
46 Client,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct EndpointConfig {
53 pub path: String,
55 #[serde(default)]
57 pub method: HttpMethod,
58 pub clasp_address: String,
60 #[serde(default)]
62 pub description: Option<String>,
63 #[serde(default = "endpoint_enabled_default")]
65 pub enabled: bool,
66 #[serde(default)]
68 pub required_scope: Option<String>,
69 #[serde(default)]
71 pub rate_limit: u32,
72}
73
74fn endpoint_enabled_default() -> bool {
75 true
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct HttpBridgeConfig {
81 #[serde(default)]
83 pub mode: HttpMode,
84 pub url: String,
87 #[serde(default)]
89 pub endpoints: Vec<EndpointConfig>,
90 #[serde(default = "default_true")]
92 pub cors_enabled: bool,
93 #[serde(default)]
95 pub cors_origins: Vec<String>,
96 #[serde(default = "default_base_path")]
98 pub base_path: String,
99 #[serde(default = "default_timeout")]
101 pub timeout_secs: u32,
102 #[serde(default = "default_namespace")]
104 pub namespace: String,
105 #[serde(default)]
107 pub poll_interval_ms: u64,
108 #[serde(default)]
110 pub poll_endpoints: Vec<String>,
111}
112
113fn default_true() -> bool {
114 true
115}
116
117fn default_base_path() -> String {
118 "/api".to_string()
119}
120
121fn default_timeout() -> u32 {
122 30
123}
124
125fn default_namespace() -> String {
126 "/http".to_string()
127}
128
129impl Default for HttpBridgeConfig {
130 fn default() -> Self {
131 Self {
132 mode: HttpMode::Server,
133 url: "0.0.0.0:3000".to_string(),
134 endpoints: vec![
135 EndpointConfig {
136 path: "/signals".to_string(),
137 method: HttpMethod::GET,
138 clasp_address: "/**".to_string(),
139 description: Some("List all signals".to_string()),
140 enabled: true,
141 required_scope: None,
142 rate_limit: 0,
143 },
144 EndpointConfig {
145 path: "/signals/*path".to_string(),
146 method: HttpMethod::GET,
147 clasp_address: "/{path}".to_string(),
148 description: Some("Get signal value".to_string()),
149 enabled: true,
150 required_scope: None,
151 rate_limit: 0,
152 },
153 EndpointConfig {
154 path: "/signals/*path".to_string(),
155 method: HttpMethod::PUT,
156 clasp_address: "/{path}".to_string(),
157 description: Some("Set signal value".to_string()),
158 enabled: true,
159 required_scope: Some("write".to_string()),
160 rate_limit: 0,
161 },
162 EndpointConfig {
163 path: "/signals/*path".to_string(),
164 method: HttpMethod::POST,
165 clasp_address: "/{path}".to_string(),
166 description: Some("Publish event".to_string()),
167 enabled: true,
168 required_scope: Some("write".to_string()),
169 rate_limit: 0,
170 },
171 ],
172 cors_enabled: true,
173 cors_origins: vec![],
174 base_path: "/api".to_string(),
175 timeout_secs: 30,
176 namespace: "/http".to_string(),
177 poll_interval_ms: 0,
178 poll_endpoints: vec![],
179 }
180 }
181}
182
183#[derive(Clone)]
185struct AppState {
186 event_tx: mpsc::Sender<BridgeEvent>,
187 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
188 namespace: String,
189}
190
191pub struct HttpBridge {
193 config: BridgeConfig,
194 http_config: HttpBridgeConfig,
195 running: Arc<Mutex<bool>>,
196 shutdown_tx: Option<mpsc::Sender<()>>,
197 signals: Arc<parking_lot::RwLock<HashMap<String, Value>>>,
198}
199
200impl HttpBridge {
201 pub fn new(http_config: HttpBridgeConfig) -> Self {
203 let config = BridgeConfig {
204 name: "HTTP Bridge".to_string(),
205 protocol: "http".to_string(),
206 bidirectional: true,
207 ..Default::default()
208 };
209
210 Self {
211 config,
212 http_config,
213 running: Arc::new(Mutex::new(false)),
214 shutdown_tx: None,
215 signals: Arc::new(parking_lot::RwLock::new(HashMap::new())),
216 }
217 }
218
219 fn json_to_value(json: serde_json::Value) -> Value {
221 match json {
222 serde_json::Value::Null => Value::Null,
223 serde_json::Value::Bool(b) => Value::Bool(b),
224 serde_json::Value::Number(n) => {
225 if let Some(i) = n.as_i64() {
226 Value::Int(i)
227 } else if let Some(f) = n.as_f64() {
228 Value::Float(f)
229 } else {
230 Value::Null
231 }
232 }
233 serde_json::Value::String(s) => Value::String(s),
234 serde_json::Value::Array(arr) => {
235 Value::Array(arr.into_iter().map(Self::json_to_value).collect())
236 }
237 serde_json::Value::Object(obj) => {
238 let map: HashMap<String, Value> = obj
239 .into_iter()
240 .map(|(k, v)| (k, Self::json_to_value(v)))
241 .collect();
242 Value::Map(map)
243 }
244 }
245 }
246
247 fn value_to_json(value: &Value) -> serde_json::Value {
249 match value {
250 Value::Null => serde_json::Value::Null,
251 Value::Bool(b) => serde_json::Value::Bool(*b),
252 Value::Int(i) => serde_json::Value::Number((*i).into()),
253 Value::Float(f) => serde_json::Number::from_f64(*f)
254 .map(serde_json::Value::Number)
255 .unwrap_or(serde_json::Value::Null),
256 Value::String(s) => serde_json::Value::String(s.clone()),
257 Value::Bytes(b) => serde_json::Value::Array(
258 b.iter()
259 .map(|&x| serde_json::Value::Number(x.into()))
260 .collect(),
261 ),
262 Value::Array(arr) => {
263 serde_json::Value::Array(arr.iter().map(Self::value_to_json).collect())
264 }
265 Value::Map(m) => serde_json::Value::Object(
266 m.iter()
267 .map(|(k, v)| (k.clone(), Self::value_to_json(v)))
268 .collect(),
269 ),
270 }
271 }
272
273 fn build_router(state: AppState, base_path: &str) -> Router {
275 Router::new()
276 .route(&format!("{}/signals", base_path), get(list_signals))
277 .route(
278 &format!("{}/*path", base_path),
279 get(get_signal)
280 .put(set_signal)
281 .post(publish_event)
282 .delete(delete_signal),
283 )
284 .route(&format!("{}/health", base_path), get(health_check))
285 .layer(TraceLayer::new_for_http())
286 .with_state(state)
287 }
288
289 pub fn update_signal(&self, address: &str, value: Value) {
291 self.signals.write().insert(address.to_string(), value);
292 }
293}
294
295async fn health_check() -> impl IntoResponse {
298 Json(serde_json::json!({
299 "status": "ok",
300 "protocol": "CLASP",
301 "version": "0.1.0"
302 }))
303}
304
305async fn list_signals(State(state): State<AppState>) -> impl IntoResponse {
306 let signals = state.signals.read();
307 let list: Vec<serde_json::Value> = signals
308 .iter()
309 .map(|(addr, val)| {
310 serde_json::json!({
311 "address": addr,
312 "value": HttpBridge::value_to_json(val)
313 })
314 })
315 .collect();
316
317 Json(serde_json::json!({
318 "signals": list,
319 "count": list.len()
320 }))
321}
322
323async fn get_signal(State(state): State<AppState>, Path(path): Path<String>) -> impl IntoResponse {
324 let address = format!("/{}", path);
325 let signals = state.signals.read();
326
327 if let Some(value) = signals.get(&address) {
328 Json(serde_json::json!({
329 "address": address,
330 "value": HttpBridge::value_to_json(value)
331 }))
332 .into_response()
333 } else {
334 (
335 StatusCode::NOT_FOUND,
336 Json(serde_json::json!({
337 "error": "Signal not found",
338 "address": address
339 })),
340 )
341 .into_response()
342 }
343}
344
345async fn set_signal(
346 State(state): State<AppState>,
347 Path(path): Path<String>,
348 Json(body): Json<serde_json::Value>,
349) -> impl IntoResponse {
350 let address = format!("{}/{}", state.namespace, path);
351
352 let value = if let Some(v) = body.get("value") {
353 HttpBridge::json_to_value(v.clone())
354 } else {
355 HttpBridge::json_to_value(body)
356 };
357
358 state.signals.write().insert(address.clone(), value.clone());
360
361 let msg = Message::Set(SetMessage {
363 address: address.clone(),
364 value: value.clone(),
365 revision: None,
366 lock: false,
367 unlock: false,
368 });
369
370 if let Err(e) = state.event_tx.send(BridgeEvent::ToClasp(msg)).await {
371 error!("Failed to send set event: {}", e);
372 return (
373 StatusCode::INTERNAL_SERVER_ERROR,
374 Json(serde_json::json!({ "error": "Internal error" })),
375 )
376 .into_response();
377 }
378
379 Json(serde_json::json!({
380 "address": address,
381 "value": HttpBridge::value_to_json(&value),
382 "status": "set"
383 }))
384 .into_response()
385}
386
387async fn publish_event(
388 State(state): State<AppState>,
389 Path(path): Path<String>,
390 Json(body): Json<serde_json::Value>,
391) -> impl IntoResponse {
392 let address = format!("{}/{}", state.namespace, path);
393
394 let value = if let Some(v) = body.get("value") {
395 HttpBridge::json_to_value(v.clone())
396 } else {
397 HttpBridge::json_to_value(body)
398 };
399
400 let msg = Message::Publish(PublishMessage {
402 address: address.clone(),
403 signal: Some(SignalType::Event),
404 value: Some(value.clone()),
405 payload: None,
406 samples: None,
407 rate: None,
408 id: None,
409 phase: None,
410 timestamp: None,
411 timeline: None,
412 });
413
414 if let Err(e) = state.event_tx.send(BridgeEvent::ToClasp(msg)).await {
415 error!("Failed to send publish event: {}", e);
416 return (
417 StatusCode::INTERNAL_SERVER_ERROR,
418 Json(serde_json::json!({ "error": "Internal error" })),
419 )
420 .into_response();
421 }
422
423 Json(serde_json::json!({
424 "address": address,
425 "value": HttpBridge::value_to_json(&value),
426 "status": "published"
427 }))
428 .into_response()
429}
430
431async fn delete_signal(
432 State(state): State<AppState>,
433 Path(path): Path<String>,
434) -> impl IntoResponse {
435 let address = format!("{}/{}", state.namespace, path);
436
437 let removed = state.signals.write().remove(&address);
439
440 if removed.is_some() {
441 let msg = Message::Set(SetMessage {
443 address: address.clone(),
444 value: Value::Null,
445 revision: None,
446 lock: false,
447 unlock: false,
448 });
449
450 let _ = state.event_tx.send(BridgeEvent::ToClasp(msg)).await;
451
452 Json(serde_json::json!({
453 "address": address,
454 "status": "deleted"
455 }))
456 .into_response()
457 } else {
458 (
459 StatusCode::NOT_FOUND,
460 Json(serde_json::json!({
461 "error": "Signal not found",
462 "address": address
463 })),
464 )
465 .into_response()
466 }
467}
468
469#[async_trait]
470impl Bridge for HttpBridge {
471 fn config(&self) -> &BridgeConfig {
472 &self.config
473 }
474
475 async fn start(&mut self) -> Result<mpsc::Receiver<BridgeEvent>> {
476 if *self.running.lock() {
477 return Err(BridgeError::Other("Bridge already running".to_string()));
478 }
479
480 let (tx, rx) = mpsc::channel(100);
481 let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
482 self.shutdown_tx = Some(shutdown_tx);
483
484 match self.http_config.mode {
485 HttpMode::Server => {
486 let addr: SocketAddr = self
487 .http_config
488 .url
489 .parse()
490 .map_err(|e| BridgeError::Other(format!("Invalid address: {}", e)))?;
491
492 let app_state = AppState {
493 event_tx: tx.clone(),
494 signals: self.signals.clone(),
495 namespace: self.http_config.namespace.clone(),
496 };
497
498 let mut router = Self::build_router(app_state, &self.http_config.base_path);
499
500 if self.http_config.cors_enabled {
502 let cors = CorsLayer::new()
503 .allow_origin(Any)
504 .allow_methods(Any)
505 .allow_headers(Any);
506 router = router.layer(cors);
507 }
508
509 let running = self.running.clone();
510 let tx_clone = tx.clone();
511
512 tokio::spawn(async move {
513 let listener = match tokio::net::TcpListener::bind(addr).await {
514 Ok(l) => l,
515 Err(e) => {
516 error!("Failed to bind HTTP server: {}", e);
517 let _ = tx_clone
518 .send(BridgeEvent::Error(format!("Bind failed: {}", e)))
519 .await;
520 return;
521 }
522 };
523
524 info!("HTTP server listening on {}", addr);
525 *running.lock() = true;
526 let _ = tx_clone.send(BridgeEvent::Connected).await;
527
528 axum::serve(listener, router)
529 .with_graceful_shutdown(async move {
530 let _ = shutdown_rx.recv().await;
531 })
532 .await
533 .ok();
534
535 *running.lock() = false;
536 let _ = tx_clone
537 .send(BridgeEvent::Disconnected {
538 reason: Some("Server stopped".to_string()),
539 })
540 .await;
541 info!("HTTP server stopped");
542 });
543
544 *self.running.lock() = true;
545 info!(
546 "HTTP bridge started in server mode on {}",
547 self.http_config.url
548 );
549 }
550 HttpMode::Client => {
551 *self.running.lock() = true;
553 let _ = tx.send(BridgeEvent::Connected).await;
554
555 if self.http_config.poll_interval_ms > 0 {
557 let poll_interval =
558 std::time::Duration::from_millis(self.http_config.poll_interval_ms);
559 let base_url = self.http_config.url.clone();
560 let timeout_secs = self.http_config.timeout_secs;
561 let namespace = self.http_config.namespace.clone();
562 let poll_endpoints = if self.http_config.poll_endpoints.is_empty() {
563 vec!["/api/signals".to_string()]
564 } else {
565 self.http_config.poll_endpoints.clone()
566 };
567 let signals = self.signals.clone();
568 let running = self.running.clone();
569 let tx_clone = tx.clone();
570
571 tokio::spawn(async move {
572 let client = match reqwest::Client::builder()
573 .timeout(std::time::Duration::from_secs(timeout_secs as u64))
574 .build()
575 {
576 Ok(c) => c,
577 Err(e) => {
578 error!("Failed to create HTTP client: {}", e);
579 return;
580 }
581 };
582
583 let mut interval = tokio::time::interval(poll_interval);
584 info!(
585 "HTTP polling started with interval {}ms",
586 poll_interval.as_millis()
587 );
588
589 loop {
590 interval.tick().await;
591
592 if !*running.lock() {
593 break;
594 }
595
596 for endpoint in &poll_endpoints {
597 let url = format!("{}{}", base_url, endpoint);
598
599 match client.get(&url).send().await {
600 Ok(response) => {
601 if response.status().is_success() {
602 if let Ok(json) =
603 response.json::<serde_json::Value>().await
604 {
605 if let Some(signals_arr) =
607 json.get("signals").and_then(|s| s.as_array())
608 {
609 for signal in signals_arr {
610 if let (Some(addr), Some(val)) = (
611 signal
612 .get("address")
613 .and_then(|a| a.as_str()),
614 signal.get("value"),
615 ) {
616 let clasp_addr =
617 format!("{}{}", namespace, addr);
618 let value = HttpBridge::json_to_value(
619 val.clone(),
620 );
621
622 let changed = {
624 let current = signals.read();
625 current.get(&clasp_addr)
626 != Some(&value)
627 };
628
629 if changed {
630 signals.write().insert(
631 clasp_addr.clone(),
632 value.clone(),
633 );
634
635 let msg =
636 Message::Set(SetMessage {
637 address: clasp_addr,
638 value,
639 revision: None,
640 lock: false,
641 unlock: false,
642 });
643
644 if let Err(e) = tx_clone
645 .send(BridgeEvent::ToClasp(msg))
646 .await
647 {
648 debug!("Failed to send polled data: {}", e);
649 }
650 }
651 }
652 }
653 } else if let Some(value) = json.get("value") {
654 if let Some(addr) =
656 json.get("address").and_then(|a| a.as_str())
657 {
658 let clasp_addr =
659 format!("{}{}", namespace, addr);
660 let value = HttpBridge::json_to_value(
661 value.clone(),
662 );
663
664 let changed = {
665 let current = signals.read();
666 current.get(&clasp_addr) != Some(&value)
667 };
668
669 if changed {
670 signals.write().insert(
671 clasp_addr.clone(),
672 value.clone(),
673 );
674
675 let msg = Message::Set(SetMessage {
676 address: clasp_addr,
677 value,
678 revision: None,
679 lock: false,
680 unlock: false,
681 });
682
683 let _ = tx_clone
684 .send(BridgeEvent::ToClasp(msg))
685 .await;
686 }
687 }
688 }
689 }
690 }
691 }
692 Err(e) => {
693 debug!("HTTP poll error for {}: {}", url, e);
694 }
695 }
696 }
697 }
698
699 info!("HTTP polling stopped");
700 });
701 }
702
703 info!(
704 "HTTP bridge started in client mode for {}",
705 self.http_config.url
706 );
707 }
708 }
709
710 Ok(rx)
711 }
712
713 async fn stop(&mut self) -> Result<()> {
714 *self.running.lock() = false;
715 if let Some(tx) = self.shutdown_tx.take() {
716 let _ = tx.send(()).await;
717 }
718 info!("HTTP bridge stopped");
719 Ok(())
720 }
721
722 async fn send(&self, msg: Message) -> Result<()> {
723 if !*self.running.lock() {
724 return Err(BridgeError::Other("Not connected".to_string()));
725 }
726
727 match self.http_config.mode {
728 HttpMode::Server => {
729 match &msg {
731 Message::Set(set) => {
732 self.signals
733 .write()
734 .insert(set.address.clone(), set.value.clone());
735 }
736 Message::Publish(pub_msg) => {
737 if let Some(value) = &pub_msg.value {
738 self.signals
739 .write()
740 .insert(pub_msg.address.clone(), value.clone());
741 }
742 }
743 _ => {}
744 }
745 debug!("HTTP server cached CLASP message");
746 Ok(())
747 }
748 HttpMode::Client => {
749 let (address, value, method) = match &msg {
751 Message::Set(set) => (&set.address, &set.value, HttpMethod::PUT),
752 Message::Publish(pub_msg) => {
753 if let Some(val) = &pub_msg.value {
754 (&pub_msg.address, val, HttpMethod::POST)
755 } else {
756 return Ok(());
757 }
758 }
759 _ => return Ok(()),
760 };
761
762 let url = format!("{}{}", self.http_config.url, address);
763 let body = Self::value_to_json(value);
764
765 let client = reqwest::Client::builder()
766 .timeout(std::time::Duration::from_secs(
767 self.http_config.timeout_secs as u64,
768 ))
769 .build()
770 .map_err(|e| BridgeError::Other(format!("HTTP client error: {}", e)))?;
771
772 let request = match method {
773 HttpMethod::GET => client.get(&url),
774 HttpMethod::POST => client.post(&url).json(&body),
775 HttpMethod::PUT => client.put(&url).json(&body),
776 HttpMethod::DELETE => client.delete(&url),
777 HttpMethod::PATCH => client.patch(&url).json(&body),
778 };
779
780 let response = request
781 .send()
782 .await
783 .map_err(|e| BridgeError::Other(format!("HTTP request failed: {}", e)))?;
784
785 debug!("HTTP {} {} -> {}", method, url, response.status());
786 Ok(())
787 }
788 }
789 }
790
791 fn is_running(&self) -> bool {
792 *self.running.lock()
793 }
794
795 fn namespace(&self) -> &str {
796 &self.http_config.namespace
797 }
798}
799
800impl std::fmt::Display for HttpMethod {
801 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
802 match self {
803 HttpMethod::GET => write!(f, "GET"),
804 HttpMethod::POST => write!(f, "POST"),
805 HttpMethod::PUT => write!(f, "PUT"),
806 HttpMethod::DELETE => write!(f, "DELETE"),
807 HttpMethod::PATCH => write!(f, "PATCH"),
808 }
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 #[test]
817 fn test_config_default() {
818 let config = HttpBridgeConfig::default();
819 assert!(config.cors_enabled);
820 assert!(!config.endpoints.is_empty());
821 }
822
823 #[test]
824 fn test_value_conversion() {
825 let json = serde_json::json!({
826 "intensity": 0.75,
827 "enabled": true,
828 "name": "main light"
829 });
830
831 let value = HttpBridge::json_to_value(json.clone());
832 let back = HttpBridge::value_to_json(&value);
833
834 assert_eq!(json, back);
835 }
836}