Skip to main content

folk_api/
server_plugin.rs

1//! `ServerPlugin` helper trait for the common case: a plugin that runs one
2//! long-lived background task (HTTP server, queue consumer, etc.) and shuts
3//! down when the server signals shutdown.
4//!
5//! Most plugins should implement [`ServerPlugin`] rather than [`Plugin`]
6//! directly, then wrap with [`ServerPluginWrapper`].
7
8use std::sync::{Arc, Mutex};
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use tokio::task::JoinHandle;
13use tracing::{error, info, warn};
14
15use crate::context::PluginContext;
16use crate::plugin::Plugin;
17use crate::rpc::RpcMethodDef;
18
19/// Trait for plugins that run a single long-lived background task.
20/// The `run` future should observe `ctx.shutdown` and return when signaled.
21#[async_trait]
22pub trait ServerPlugin: Send + Sync + 'static {
23    /// Stable plugin name (same semantics as [`Plugin::name`]).
24    fn name(&self) -> &'static str;
25
26    /// Run the plugin's main loop. Return `Ok(())` on shutdown.
27    ///
28    /// Observe `ctx.shutdown` to detect the server shutdown signal.
29    /// Do **not** discard `watch::RecvError` with `.ok()` — treat it as an
30    /// unexpected condition (the sender was dropped without signaling shutdown)
31    /// and log an error:
32    ///
33    /// ```ignore
34    /// if let Err(e) = ctx.shutdown.changed().await {
35    ///     tracing::error!(error = %e, "shutdown sender dropped unexpectedly");
36    /// }
37    /// ```
38    async fn run(&self, ctx: PluginContext) -> Result<()>;
39
40    /// Optional RPC method advertisements (default empty).
41    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
42        Vec::new()
43    }
44}
45
46/// Wraps a [`ServerPlugin`] so it can be used as a [`Plugin`].
47/// Spawns `run` at boot, awaits the task during shutdown.
48pub struct ServerPluginWrapper<S: ServerPlugin> {
49    inner: Arc<S>,
50    handle: Mutex<Option<JoinHandle<Result<()>>>>,
51}
52
53impl<S: ServerPlugin> ServerPluginWrapper<S> {
54    pub fn new(inner: S) -> Self {
55        Self {
56            inner: Arc::new(inner),
57            handle: Mutex::new(None),
58        }
59    }
60}
61
62#[async_trait]
63impl<S: ServerPlugin> Plugin for ServerPluginWrapper<S> {
64    fn name(&self) -> &'static str {
65        self.inner.name()
66    }
67
68    async fn boot(&mut self, ctx: PluginContext) -> Result<()> {
69        let mut slot = self.handle.lock().expect("mutex");
70        if slot.is_some() {
71            return Err(anyhow::anyhow!(
72                "plugin '{}' is already running; boot() called twice",
73                self.inner.name()
74            ));
75        }
76        let inner = self.inner.clone();
77        let handle = tokio::spawn(async move { inner.run(ctx).await });
78        *slot = Some(handle);
79        info!(plugin = self.inner.name(), "server plugin booted");
80        Ok(())
81    }
82
83    async fn shutdown(&self) -> Result<()> {
84        let handle = {
85            let mut slot = self
86                .handle
87                .lock()
88                .map_err(|_| anyhow::anyhow!("ServerPluginWrapper handle mutex poisoned"))?;
89            slot.take()
90        };
91
92        let Some(handle) = handle else {
93            warn!(
94                plugin = self.inner.name(),
95                "shutdown called but no handle present"
96            );
97            return Ok(());
98        };
99
100        // Abort ensures the task stops even if it is not observing ctx.shutdown.
101        // The shutdown signal was already broadcast before shutdown() is called,
102        // so cooperative tasks have had a chance to react; abort() is the fallback.
103        // Without this, a stuck task causes handle.await to deadlock forever.
104        handle.abort();
105
106        match handle.await {
107            Ok(Ok(())) => {
108                info!(plugin = self.inner.name(), "run loop completed cleanly");
109                Ok(())
110            }
111            Ok(Err(err)) => {
112                error!(plugin = self.inner.name(), error = ?err, "run loop returned error");
113                Err(err).context("ServerPlugin run loop failed")
114            }
115            Err(join_err) if join_err.is_cancelled() => {
116                info!(
117                    plugin = self.inner.name(),
118                    "run loop cancelled during shutdown"
119                );
120                Ok(())
121            }
122            Err(join_err) => {
123                error!(plugin = self.inner.name(), error = %join_err, "run loop join failed");
124                Err(anyhow::anyhow!("ServerPlugin task join failed: {join_err}"))
125            }
126        }
127    }
128
129    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
130        self.inner.rpc_methods()
131    }
132}