dragonfly_plugin/
lib.rs

1#![allow(clippy::all)]
2
3#[path = "generated/df.plugin.rs"]
4mod df_plugin;
5
6pub mod types {
7    pub use super::df_plugin::plugin_client::PluginClient;
8    pub use super::df_plugin::*;
9    pub use super::df_plugin::{
10        action::Kind as ActionKind, event_envelope::Payload as EventPayload,
11        event_result::Update as EventResultUpdate, host_to_plugin::Payload as HostPayload,
12        plugin_to_host::Payload as PluginPayload,
13    };
14}
15
16pub mod event;
17#[path = "server/server.rs"]
18pub mod server;
19
20use std::error::Error;
21
22pub use server::*;
23
24// main usage stuff for plugin devs:
25pub use event::PluginEventHandler;
26pub use rust_plugin_macro::Handler;
27use tokio::sync::mpsc;
28use tokio_stream::{wrappers::ReceiverStream, StreamExt};
29
30#[cfg(unix)]
31use hyper_util::rt::TokioIo;
32#[cfg(unix)]
33use tokio::net::UnixStream;
34
35/// Helper function to connect to the server, supporting both Unix sockets and TCP.
36async fn connect_to_server(
37    addr: &str,
38) -> Result<types::PluginClient<tonic::transport::Channel>, Box<dyn Error>> {
39    // Check if it's a Unix socket address (starts with "unix:" or is a path starting with "/")
40    if addr.starts_with("unix:") || addr.starts_with('/') {
41        #[cfg(unix)]
42        {
43            // Extract the path and convert to owned String for the closure
44            let path: String = if addr.starts_with("unix://") {
45                addr[7..].to_string()
46            } else if addr.starts_with("unix:") {
47                addr[5..].to_string()
48            } else {
49                addr.to_string()
50            };
51            // Create a lazy channel that uses Unix sockets.
52            // Lazy is required so the hello message gets sent as part of stream
53            // establishment, avoiding a deadlock with the Go server which waits
54            // for the hello before sending response headers.
55            let channel = tonic::transport::Endpoint::try_from("http://[::1]:50051")?
56                .connect_with_connector_lazy(service_fn(move |_: tonic::transport::Uri| {
57                    let path = path.clone();
58                    async move {
59                        let stream = UnixStream::connect(&path).await?;
60                        Ok::<_, std::io::Error>(TokioIo::new(stream))
61                    }
62                }));
63            Ok(types::PluginClient::new(channel))
64        }
65        #[cfg(not(unix))]
66        {
67            Err("Unix sockets are not supported on this platform".into())
68        }
69    } else {
70        // Regular TCP connection
71        Ok(types::PluginClient::connect(addr.to_string()).await?)
72    }
73}
74
75pub struct Plugin {
76    id: String,
77    name: String,
78    version: String,
79    api_version: String,
80}
81
82impl Plugin {
83    pub fn new(id: &str, name: &str, version: &str, api_version: &str) -> Self {
84        Self {
85            id: id.to_string(),
86            name: name.to_string(),
87            version: version.to_string(),
88            api_version: api_version.to_string(),
89        }
90    }
91
92    /// Runs the plugin, connecting to the server and starting the event loop.
93    pub async fn run(
94        self,
95        handler: impl PluginEventHandler + PluginSubscriptions + 'static,
96        addr: &str,
97    ) -> Result<(), Box<dyn Error>> {
98        let mut raw_client = connect_to_server(addr).await?;
99
100        let (tx, rx) = mpsc::channel(128);
101
102        // Pre-buffer the hello message so it's sent immediately when stream opens.
103        // This is required because the Go server blocks on Recv() waiting for the
104        // hello before sending response headers.
105        let hello_msg = types::PluginToHost {
106            plugin_id: self.id.clone(),
107            payload: Some(types::PluginPayload::Hello(types::PluginHello {
108                name: self.name.clone(),
109                version: self.version.clone(),
110                api_version: self.api_version.clone(),
111                commands: vec![],
112                custom_items: vec![],
113            })),
114        };
115        tx.send(hello_msg).await?;
116
117        let request_stream = ReceiverStream::new(rx);
118        let mut event_stream = raw_client.event_stream(request_stream).await?.into_inner();
119
120        let server = Server {
121            plugin_id: self.id.clone(),
122            sender: tx.clone(),
123        };
124
125        let events = handler.get_subscriptions();
126        if !events.is_empty() {
127            println!("Subscribing to {} event types...", events.len());
128            server.subscribe(events).await?;
129        }
130
131        println!("Plugin '{}' connected and listening.", self.name);
132
133        // 8. Run the main event loop
134        while let Some(Ok(msg)) = event_stream.next().await {
135            match msg.payload {
136                // We received a game event
137                Some(types::HostPayload::Event(envelope)) => {
138                    event::dispatch_event(&server, &handler, &envelope).await;
139                }
140                // The server is shutting us down
141                Some(types::HostPayload::Shutdown(shutdown)) => {
142                    println!("Server shutting down plugin: {}", shutdown.reason);
143                    break; // Break the loop
144                }
145                _ => { /* Ignore other payloads */ }
146            }
147        }
148
149        println!("Plugin '{}' disconnected.", self.name);
150        Ok(())
151    }
152}
153
154/// A trait that defines which events your plugin will receive.
155///
156/// You can implement this trait manually, or you can use the
157/// `#[bedrock_plugin]` macro on your `PluginEventHandler`
158/// implementation to generate it for you.
159pub trait PluginSubscriptions {
160    fn get_subscriptions(&self) -> Vec<types::EventType>;
161}