Skip to main content

folk_api/
server_plugin.rs

1//! `ServerPlugin` helper trait for the common case: a plugin that runs one
2//! long-lived background task (HTTP server, queue consumer, etc.) and shuts
3//! down when the server signals shutdown.
4//!
5//! Most plugins should implement [`ServerPlugin`] rather than [`Plugin`]
6//! directly, then wrap with [`ServerPluginWrapper`].
7
8use std::sync::{Arc, Mutex};
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use tokio::task::JoinHandle;
13use tracing::{error, info, warn};
14
15use crate::context::PluginContext;
16use crate::plugin::Plugin;
17use crate::rpc::RpcMethodDef;
18
19/// Trait for plugins that run a single long-lived background task.
20/// The `run` future should observe `ctx.shutdown` and return when signaled.
21#[async_trait]
22pub trait ServerPlugin: Send + Sync + 'static {
23    /// Stable plugin name (same semantics as [`Plugin::name`]).
24    fn name(&self) -> &'static str;
25
26    /// Run the plugin's main loop. Return `Ok(())` on shutdown.
27    async fn run(&self, ctx: PluginContext) -> Result<()>;
28
29    /// Optional RPC method advertisements (default empty).
30    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
31        Vec::new()
32    }
33}
34
35/// Wraps a [`ServerPlugin`] so it can be used as a [`Plugin`].
36/// Spawns `run` at boot, awaits the task during shutdown.
37pub struct ServerPluginWrapper<S: ServerPlugin> {
38    inner: Arc<S>,
39    handle: Mutex<Option<JoinHandle<Result<()>>>>,
40}
41
42impl<S: ServerPlugin> ServerPluginWrapper<S> {
43    pub fn new(inner: S) -> Self {
44        Self {
45            inner: Arc::new(inner),
46            handle: Mutex::new(None),
47        }
48    }
49}
50
51#[async_trait]
52impl<S: ServerPlugin> Plugin for ServerPluginWrapper<S> {
53    fn name(&self) -> &'static str {
54        self.inner.name()
55    }
56
57    async fn boot(&mut self, ctx: PluginContext) -> Result<()> {
58        let inner = self.inner.clone();
59        let handle = tokio::spawn(async move { inner.run(ctx).await });
60        *self.handle.lock().expect("mutex") = Some(handle);
61        info!(plugin = self.inner.name(), "server plugin booted");
62        Ok(())
63    }
64
65    async fn shutdown(&self) -> Result<()> {
66        let handle = {
67            let mut slot = self
68                .handle
69                .lock()
70                .map_err(|_| anyhow::anyhow!("ServerPluginWrapper handle mutex poisoned"))?;
71            slot.take()
72        };
73
74        let Some(handle) = handle else {
75            warn!(
76                plugin = self.inner.name(),
77                "shutdown called but no handle present"
78            );
79            return Ok(());
80        };
81
82        match handle.await {
83            Ok(Ok(())) => {
84                info!(plugin = self.inner.name(), "run loop completed cleanly");
85                Ok(())
86            }
87            Ok(Err(err)) => {
88                error!(plugin = self.inner.name(), error = ?err, "run loop returned error");
89                Err(err).context("ServerPlugin run loop failed")
90            }
91            Err(join_err) => {
92                error!(plugin = self.inner.name(), error = %join_err, "run loop join failed");
93                Err(anyhow::anyhow!("ServerPlugin task join failed: {join_err}"))
94            }
95        }
96    }
97
98    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
99        self.inner.rpc_methods()
100    }
101}