Skip to main content

pim_messaging/
plugin.rs

1//! [`pim_plugin::DaemonPlugin`] implementation that delegates to
2//! [`crate::service::MessagingService`].
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use bytes::Bytes;
8use pim_core::NodeId;
9use pim_plugin::{DaemonPlugin, PluginContext};
10
11use crate::service::MessagingService;
12use crate::wire::{KIND_ACK, KIND_MESSAGE};
13
14const KINDS: &[&str] = &[KIND_MESSAGE, KIND_ACK];
15
16/// Daemon plugin shell.
17///
18/// The daemon constructs the [`MessagingService`] first (since its RPC
19/// layer needs a direct handle to call `send`, `mark_*`, history, etc.)
20/// and then wraps it with this plugin so payload dispatch and lifecycle
21/// hooks can flow through the same instance.
22pub struct MessagingPlugin {
23    service: Arc<MessagingService>,
24}
25
26impl MessagingPlugin {
27    /// Wrap an existing [`MessagingService`] as a [`DaemonPlugin`].
28    pub fn new(service: Arc<MessagingService>) -> Arc<Self> {
29        Arc::new(Self { service })
30    }
31}
32
33#[async_trait]
34impl DaemonPlugin for MessagingPlugin {
35    fn name(&self) -> &'static str {
36        "messaging"
37    }
38
39    fn payload_kinds(&self) -> &'static [&'static str] {
40        KINDS
41    }
42
43    async fn handle_payload(&self, src: NodeId, kind: &str, body: Bytes) {
44        match kind {
45            KIND_MESSAGE => self.service.handle_incoming_message(src, body).await,
46            KIND_ACK => self.service.handle_incoming_message_ack(src, body).await,
47            _ => {}
48        }
49    }
50
51    async fn start(self: Arc<Self>, _ctx: PluginContext) -> anyhow::Result<()> {
52        // No background tasks: the plugin reacts to inbound payloads
53        // and to RPC calls dispatched into `MessagingService` directly.
54        Ok(())
55    }
56
57    async fn on_peer_forgotten(&self, peer: NodeId) {
58        self.service.on_peer_forgotten(peer).await;
59    }
60
61    async fn shutdown(&self) {}
62}