1#[doc = include_str!("../README.md")]
2#[path = "generated/df.plugin.rs"]
3mod df_plugin;
4
5pub mod types {
6 pub use super::df_plugin::plugin_client::PluginClient;
7 pub use super::df_plugin::*;
8 pub use super::df_plugin::{
9 action::Kind as ActionKind, event_envelope::Payload as EventPayload,
10 event_result::Update as EventResultUpdate, host_to_plugin::Payload as HostPayload,
11 plugin_to_host::Payload as PluginPayload,
12 };
13}
14
15pub mod event;
16#[path = "server/server.rs"]
17pub mod server;
18
19use std::error::Error;
20
21pub use server::*;
22
23pub use dragonfly_plugin_macro::{event_handler, Plugin};
25pub use event::EventHandler;
26use tokio::sync::mpsc;
27use tokio_stream::{wrappers::ReceiverStream, StreamExt};
28
29#[cfg(unix)]
30use hyper_util::rt::TokioIo;
31#[cfg(unix)]
32use tokio::net::UnixStream;
33
34async fn connect_to_server(
36 addr: &str,
37) -> Result<types::PluginClient<tonic::transport::Channel>, Box<dyn Error>> {
38 if addr.starts_with("unix:") || addr.starts_with('/') {
40 #[cfg(unix)]
41 {
42 let path: String = if addr.starts_with("unix://") {
44 addr[7..].to_string()
45 } else if addr.starts_with("unix:") {
46 addr[5..].to_string()
47 } else {
48 addr.to_string()
49 };
50 let channel = tonic::transport::Endpoint::try_from("http://[::1]:50051")?
55 .connect_with_connector_lazy(service_fn(move |_: tonic::transport::Uri| {
56 let path = path.clone();
57 async move {
58 let stream = UnixStream::connect(&path).await?;
59 Ok::<_, std::io::Error>(TokioIo::new(stream))
60 }
61 }));
62 Ok(types::PluginClient::new(channel))
63 }
64 #[cfg(not(unix))]
65 {
66 Err("Unix sockets are not supported on this platform".into())
67 }
68 } else {
69 Ok(types::PluginClient::connect(addr.to_string()).await?)
71 }
72}
73
74pub struct PluginRunner {}
75
76impl PluginRunner {
77 pub async fn run(plugin: impl Plugin + 'static, addr: &str) -> Result<(), Box<dyn Error>> {
79 let mut raw_client = connect_to_server(addr).await?;
80
81 let (tx, rx) = mpsc::channel(128);
82
83 let hello_msg = types::PluginToHost {
87 plugin_id: plugin.get_id().to_owned(),
88 payload: Some(types::PluginPayload::Hello(types::PluginHello {
89 name: plugin.get_name().to_owned(),
90 version: plugin.get_version().to_owned(),
91 api_version: plugin.get_api_version().to_owned(),
92 commands: vec![],
93 custom_items: vec![],
94 })),
95 };
96 tx.send(hello_msg).await?;
97
98 let request_stream = ReceiverStream::new(rx);
99 let mut event_stream = raw_client.event_stream(request_stream).await?.into_inner();
100
101 let server = Server {
102 plugin_id: plugin.get_id().to_owned(),
103 sender: tx.clone(),
104 };
105
106 let events = plugin.get_subscriptions();
107 if !events.is_empty() {
108 println!("Subscribing to {} event types...", events.len());
109 server.subscribe(events).await?;
110 }
111
112 println!("Plugin '{}' connected and listening.", plugin.get_name());
113
114 while let Some(Ok(msg)) = event_stream.next().await {
116 match msg.payload {
117 Some(types::HostPayload::Event(envelope)) => {
119 event::dispatch_event(&server, &plugin, &envelope).await;
120 }
121 Some(types::HostPayload::Shutdown(shutdown)) => {
123 println!("Server shutting down plugin: {}", shutdown.reason);
124 break; }
126 _ => { }
127 }
128 }
129
130 println!("Plugin '{}' disconnected.", plugin.get_name());
131 Ok(())
132 }
133}
134
135pub trait EventSubscriptions {
141 fn get_subscriptions(&self) -> Vec<types::EventType>;
142}
143
144pub struct PluginInfo<'a> {
146 pub id: &'a str,
147 pub name: &'a str,
148 pub version: &'a str,
149 pub api_version: &'a str,
150}
151
152pub trait Plugin: EventHandler + EventSubscriptions {
185 fn get_info(&self) -> PluginInfo<'_>;
186
187 fn get_id(&self) -> &str;
188
189 fn get_name(&self) -> &str;
190
191 fn get_version(&self) -> &str;
192
193 fn get_api_version(&self) -> &str;
194}