Skip to main content

tidepool_server/
webhook_runtime.rs

1//! Per-process webhook runtime: registry + background delivery tasks.
2//!
3//! Wraps the service-layer `WebhookRegistry` trait with a
4//! JoinHandle-tracking layer so CRUD operations can also manage the
5//! lifecycle of the matching polling task. The server ctx holds one of
6//! these.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use async_trait::async_trait;
13use serde_json::Value;
14use tokio::sync::Mutex;
15
16use tidepool_rpc::upstream::UpstreamClient;
17use tidepool_rpc::webhooks::{
18    spawn_delivery_task, MemoryWebhookRegistry, PostClient, Webhook, WebhookInput, WebhookRegistry,
19    WebhookResult,
20};
21
22/// Default-backed `PostClient` — reqwest under the hood. Placed in
23/// the server crate so the service layer stays reqwest-free.
24pub struct ReqwestPostClient {
25    client: reqwest::Client,
26}
27
28impl ReqwestPostClient {
29    #[must_use]
30    pub fn new(timeout: Duration) -> Self {
31        let client = reqwest::Client::builder()
32            .timeout(timeout)
33            .build()
34            .expect("build reqwest client");
35        Self { client }
36    }
37}
38
39#[async_trait]
40impl PostClient for ReqwestPostClient {
41    async fn post_json(&self, url: &str, auth: Option<&str>, body: &Value) -> Result<(), String> {
42        let mut req = self.client.post(url).json(body);
43        if let Some(h) = auth {
44            req = req.header("Authorization", h);
45        }
46        let resp = req.send().await.map_err(|e| e.to_string())?;
47        if !resp.status().is_success() {
48            return Err(format!("delivery non-2xx: {}", resp.status()));
49        }
50        Ok(())
51    }
52}
53
54/// Lifecycle owner for webhooks. CRUD goes through the underlying
55/// `WebhookRegistry`; task lifecycle (spawn on create, abort on delete)
56/// is mirrored in the parallel `handles` map.
57pub struct WebhookRuntime<U: UpstreamClient + ?Sized + 'static, P: PostClient + ?Sized + 'static> {
58    registry: Arc<dyn WebhookRegistry>,
59    upstream: Arc<U>,
60    poster: Arc<P>,
61    handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
62}
63
64impl<U: UpstreamClient + ?Sized + 'static, P: PostClient + ?Sized + 'static> WebhookRuntime<U, P> {
65    pub fn new(registry: Arc<dyn WebhookRegistry>, upstream: Arc<U>, poster: Arc<P>) -> Self {
66        Self {
67            registry,
68            upstream,
69            poster,
70            handles: Arc::new(Mutex::new(HashMap::new())),
71        }
72    }
73
74    /// Convenience: default-backed in-memory registry.
75    pub fn with_memory_registry(upstream: Arc<U>, poster: Arc<P>) -> Self {
76        Self::new(Arc::new(MemoryWebhookRegistry::new()), upstream, poster)
77    }
78
79    pub async fn create(&self, input: WebhookInput) -> WebhookResult<Webhook> {
80        let wh = self.registry.create(input).await?;
81        let handle = spawn_delivery_task(
82            wh.clone(),
83            Arc::clone(&self.upstream),
84            Arc::clone(&self.poster),
85        );
86        self.handles
87            .lock()
88            .await
89            .insert(wh.webhook_id.clone(), handle);
90        Ok(wh)
91    }
92
93    pub async fn list(&self) -> WebhookResult<Vec<Webhook>> {
94        self.registry.list().await
95    }
96
97    pub async fn get(&self, id: &str) -> WebhookResult<Option<Webhook>> {
98        self.registry.get(id).await
99    }
100
101    pub async fn edit(&self, id: &str, input: WebhookInput) -> WebhookResult<Webhook> {
102        let wh = self.registry.edit(id, input).await?;
103        // Restart the delivery task with the new config — existing
104        // cursor state is dropped, same as Helius's behavior when a
105        // webhook is edited.
106        let mut handles = self.handles.lock().await;
107        if let Some(prior) = handles.remove(id) {
108            prior.abort();
109        }
110        let handle = spawn_delivery_task(
111            wh.clone(),
112            Arc::clone(&self.upstream),
113            Arc::clone(&self.poster),
114        );
115        handles.insert(id.to_string(), handle);
116        Ok(wh)
117    }
118
119    pub async fn delete(&self, id: &str) -> WebhookResult<bool> {
120        let removed = self.registry.delete(id).await?;
121        if let Some(handle) = self.handles.lock().await.remove(id) {
122            handle.abort();
123        }
124        Ok(removed)
125    }
126}