folk_api/
server_plugin.rs1use std::sync::{Arc, Mutex};
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use tokio::task::JoinHandle;
13use tracing::{error, info, warn};
14
15use crate::context::PluginContext;
16use crate::plugin::Plugin;
17use crate::rpc::RpcMethodDef;
18
19#[async_trait]
22pub trait ServerPlugin: Send + Sync + 'static {
23 fn name(&self) -> &'static str;
25
26 async fn run(&self, ctx: PluginContext) -> Result<()>;
28
29 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
31 Vec::new()
32 }
33}
34
35pub struct ServerPluginWrapper<S: ServerPlugin> {
38 inner: Arc<S>,
39 handle: Mutex<Option<JoinHandle<Result<()>>>>,
40}
41
42impl<S: ServerPlugin> ServerPluginWrapper<S> {
43 pub fn new(inner: S) -> Self {
44 Self {
45 inner: Arc::new(inner),
46 handle: Mutex::new(None),
47 }
48 }
49}
50
51#[async_trait]
52impl<S: ServerPlugin> Plugin for ServerPluginWrapper<S> {
53 fn name(&self) -> &'static str {
54 self.inner.name()
55 }
56
57 async fn boot(&mut self, ctx: PluginContext) -> Result<()> {
58 let mut slot = self.handle.lock().expect("mutex");
59 if slot.is_some() {
60 return Err(anyhow::anyhow!(
61 "plugin '{}' is already running; boot() called twice",
62 self.inner.name()
63 ));
64 }
65 let inner = self.inner.clone();
66 let handle = tokio::spawn(async move { inner.run(ctx).await });
67 *slot = Some(handle);
68 info!(plugin = self.inner.name(), "server plugin booted");
69 Ok(())
70 }
71
72 async fn shutdown(&self) -> Result<()> {
73 let handle = {
74 let mut slot = self
75 .handle
76 .lock()
77 .map_err(|_| anyhow::anyhow!("ServerPluginWrapper handle mutex poisoned"))?;
78 slot.take()
79 };
80
81 let Some(handle) = handle else {
82 warn!(
83 plugin = self.inner.name(),
84 "shutdown called but no handle present"
85 );
86 return Ok(());
87 };
88
89 handle.abort();
94
95 match handle.await {
96 Ok(Ok(())) => {
97 info!(plugin = self.inner.name(), "run loop completed cleanly");
98 Ok(())
99 }
100 Ok(Err(err)) => {
101 error!(plugin = self.inner.name(), error = ?err, "run loop returned error");
102 Err(err).context("ServerPlugin run loop failed")
103 }
104 Err(join_err) if join_err.is_cancelled() => {
105 info!(
106 plugin = self.inner.name(),
107 "run loop cancelled during shutdown"
108 );
109 Ok(())
110 }
111 Err(join_err) => {
112 error!(plugin = self.inner.name(), error = %join_err, "run loop join failed");
113 Err(anyhow::anyhow!("ServerPlugin task join failed: {join_err}"))
114 }
115 }
116 }
117
118 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
119 self.inner.rpc_methods()
120 }
121}