tidepool_server/
webhook_runtime.rs1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use serde_json::Value;
14use tokio::sync::Mutex;
15
16use tidepool_rpc::upstream::UpstreamClient;
17use tidepool_rpc::webhooks::{
18 spawn_delivery_task, MemoryWebhookRegistry, PostClient, Webhook, WebhookInput, WebhookRegistry,
19 WebhookResult,
20};
21
22pub struct ReqwestPostClient {
25 client: reqwest::Client,
26}
27
28impl ReqwestPostClient {
29 #[must_use]
30 pub fn new(timeout: Duration) -> Self {
31 let client = reqwest::Client::builder()
32 .timeout(timeout)
33 .build()
34 .expect("build reqwest client");
35 Self { client }
36 }
37}
38
39#[async_trait]
40impl PostClient for ReqwestPostClient {
41 async fn post_json(&self, url: &str, auth: Option<&str>, body: &Value) -> Result<(), String> {
42 let mut req = self.client.post(url).json(body);
43 if let Some(h) = auth {
44 req = req.header("Authorization", h);
45 }
46 let resp = req.send().await.map_err(|e| e.to_string())?;
47 if !resp.status().is_success() {
48 return Err(format!("delivery non-2xx: {}", resp.status()));
49 }
50 Ok(())
51 }
52}
53
54pub struct WebhookRuntime<U: UpstreamClient + ?Sized + 'static, P: PostClient + ?Sized + 'static> {
58 registry: Arc<dyn WebhookRegistry>,
59 upstream: Arc<U>,
60 poster: Arc<P>,
61 handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
62}
63
64impl<U: UpstreamClient + ?Sized + 'static, P: PostClient + ?Sized + 'static> WebhookRuntime<U, P> {
65 pub fn new(registry: Arc<dyn WebhookRegistry>, upstream: Arc<U>, poster: Arc<P>) -> Self {
66 Self {
67 registry,
68 upstream,
69 poster,
70 handles: Arc::new(Mutex::new(HashMap::new())),
71 }
72 }
73
74 pub fn with_memory_registry(upstream: Arc<U>, poster: Arc<P>) -> Self {
76 Self::new(Arc::new(MemoryWebhookRegistry::new()), upstream, poster)
77 }
78
79 pub async fn create(&self, input: WebhookInput) -> WebhookResult<Webhook> {
80 let wh = self.registry.create(input).await?;
81 let handle = spawn_delivery_task(
82 wh.clone(),
83 Arc::clone(&self.upstream),
84 Arc::clone(&self.poster),
85 );
86 self.handles
87 .lock()
88 .await
89 .insert(wh.webhook_id.clone(), handle);
90 Ok(wh)
91 }
92
93 pub async fn list(&self) -> WebhookResult<Vec<Webhook>> {
94 self.registry.list().await
95 }
96
97 pub async fn get(&self, id: &str) -> WebhookResult<Option<Webhook>> {
98 self.registry.get(id).await
99 }
100
101 pub async fn edit(&self, id: &str, input: WebhookInput) -> WebhookResult<Webhook> {
102 let wh = self.registry.edit(id, input).await?;
103 let mut handles = self.handles.lock().await;
107 if let Some(prior) = handles.remove(id) {
108 prior.abort();
109 }
110 let handle = spawn_delivery_task(
111 wh.clone(),
112 Arc::clone(&self.upstream),
113 Arc::clone(&self.poster),
114 );
115 handles.insert(id.to_string(), handle);
116 Ok(wh)
117 }
118
119 pub async fn delete(&self, id: &str) -> WebhookResult<bool> {
120 let removed = self.registry.delete(id).await?;
121 if let Some(handle) = self.handles.lock().await.remove(id) {
122 handle.abort();
123 }
124 Ok(removed)
125 }
126}