1#[doc = include_str!("../README.md")]
2#[path = "generated/df.plugin.rs"]
3mod df_plugin;
4
5pub mod types {
6 pub use super::df_plugin::plugin_client::PluginClient;
7 pub use super::df_plugin::*;
8 pub use super::df_plugin::{
9 action::Kind as ActionKind, event_envelope::Payload as EventPayload,
10 event_result::Update as EventResultUpdate, host_to_plugin::Payload as HostPayload,
11 plugin_to_host::Payload as PluginPayload,
12 };
13}
14
15pub mod command;
16pub mod event;
17#[path = "server/server.rs"]
18pub mod server;
19
20use std::error::Error;
21
22pub use server::*;
23
24pub use dragonfly_plugin_macro::{event_handler, Command, Plugin};
26pub use event::EventHandler;
27use tokio::sync::mpsc;
28use tokio_stream::{wrappers::ReceiverStream, StreamExt};
29
30#[cfg(unix)]
31use hyper_util::rt::TokioIo;
32#[cfg(unix)]
33use tokio::net::UnixStream;
34
35use crate::command::CommandRegistry;
36
37async fn connect_to_server(
39 addr: &str,
40) -> Result<types::PluginClient<tonic::transport::Channel>, Box<dyn Error>> {
41 if addr.starts_with("unix:") || addr.starts_with('/') {
43 #[cfg(unix)]
44 {
45 let path: String = if addr.starts_with("unix://") {
47 addr[7..].to_string()
48 } else if addr.starts_with("unix:") {
49 addr[5..].to_string()
50 } else {
51 addr.to_string()
52 };
53 let channel = tonic::transport::Endpoint::try_from("http://[::1]:50051")?
58 .connect_with_connector_lazy(service_fn(move |_: tonic::transport::Uri| {
59 let path = path.clone();
60 async move {
61 let stream = UnixStream::connect(&path).await?;
62 Ok::<_, std::io::Error>(TokioIo::new(stream))
63 }
64 }));
65 Ok(types::PluginClient::new(channel))
66 }
67 #[cfg(not(unix))]
68 {
69 Err("Unix sockets are not supported on this platform".into())
70 }
71 } else {
72 Ok(types::PluginClient::connect(addr.to_string()).await?)
74 }
75}
76
77pub struct PluginRunner {}
78
79impl PluginRunner {
80 pub async fn run(plugin: impl Plugin + 'static, addr: &str) -> Result<(), Box<dyn Error>> {
82 let mut raw_client = connect_to_server(addr).await?;
83
84 let (tx, rx) = mpsc::channel(128);
85
86 let hello_msg = types::PluginToHost {
90 plugin_id: plugin.get_id().to_owned(),
91 payload: Some(types::PluginPayload::Hello(types::PluginHello {
92 name: plugin.get_name().to_owned(),
93 version: plugin.get_version().to_owned(),
94 api_version: plugin.get_api_version().to_owned(),
95 commands: plugin.get_commands(),
96 custom_items: vec![],
97 custom_blocks: vec![],
98 })),
99 };
100 tx.send(hello_msg).await?;
101
102 let request_stream = ReceiverStream::new(rx);
103 let mut event_stream = raw_client.event_stream(request_stream).await?.into_inner();
104
105 let server = Server {
106 plugin_id: plugin.get_id().to_owned(),
107 sender: tx.clone(),
108 };
109
110 let mut events = plugin.get_subscriptions();
111
112 if !plugin.get_commands().is_empty() && !events.contains(&types::EventType::Command) {
114 events.push(types::EventType::Command);
115 }
116
117 if !events.is_empty() {
118 println!("Subscribing to {} event types...", events.len());
119 server.subscribe(events).await?;
120 }
121
122 println!("Plugin '{}' connected and listening.", plugin.get_name());
123
124 while let Some(Ok(msg)) = event_stream.next().await {
126 match msg.payload {
127 Some(types::HostPayload::Event(envelope)) => {
129 event::dispatch_event(&server, &plugin, &envelope).await;
130 }
131 Some(types::HostPayload::Shutdown(shutdown)) => {
133 println!("Server shutting down plugin: {}", shutdown.reason);
134 break; }
136 _ => { }
137 }
138 }
139
140 println!("Plugin '{}' disconnected.", plugin.get_name());
141 Ok(())
142 }
143}
144
145pub trait EventSubscriptions {
151 fn get_subscriptions(&self) -> Vec<types::EventType>;
152}
153
154pub struct PluginInfo<'a> {
156 pub id: &'a str,
157 pub name: &'a str,
158 pub version: &'a str,
159 pub api_version: &'a str,
160}
161
162pub trait Plugin: EventHandler + EventSubscriptions + CommandRegistry {
195 fn get_info(&self) -> PluginInfo<'_>;
196
197 fn get_id(&self) -> &str;
198
199 fn get_name(&self) -> &str;
200
201 fn get_version(&self) -> &str;
202
203 fn get_api_version(&self) -> &str;
204}