1#![allow(clippy::all)]
2
3#[path = "generated/df.plugin.rs"]
4mod df_plugin;
5
6pub mod types {
7 pub use super::df_plugin::plugin_client::PluginClient;
8 pub use super::df_plugin::*;
9 pub use super::df_plugin::{
10 action::Kind as ActionKind, event_envelope::Payload as EventPayload,
11 event_result::Update as EventResultUpdate, host_to_plugin::Payload as HostPayload,
12 plugin_to_host::Payload as PluginPayload,
13 };
14}
15
16pub mod event;
17#[path = "server/server.rs"]
18pub mod server;
19
20use std::error::Error;
21
22pub use server::*;
23
24pub use event::PluginEventHandler;
26pub use rust_plugin_macro::Handler;
27use tokio::sync::mpsc;
28use tokio_stream::{wrappers::ReceiverStream, StreamExt};
29
30#[cfg(unix)]
31use hyper_util::rt::TokioIo;
32#[cfg(unix)]
33use tokio::net::UnixStream;
34
35async fn connect_to_server(
37 addr: &str,
38) -> Result<types::PluginClient<tonic::transport::Channel>, Box<dyn Error>> {
39 if addr.starts_with("unix:") || addr.starts_with('/') {
41 #[cfg(unix)]
42 {
43 let path: String = if addr.starts_with("unix://") {
45 addr[7..].to_string()
46 } else if addr.starts_with("unix:") {
47 addr[5..].to_string()
48 } else {
49 addr.to_string()
50 };
51 let channel = tonic::transport::Endpoint::try_from("http://[::1]:50051")?
56 .connect_with_connector_lazy(service_fn(move |_: tonic::transport::Uri| {
57 let path = path.clone();
58 async move {
59 let stream = UnixStream::connect(&path).await?;
60 Ok::<_, std::io::Error>(TokioIo::new(stream))
61 }
62 }));
63 Ok(types::PluginClient::new(channel))
64 }
65 #[cfg(not(unix))]
66 {
67 Err("Unix sockets are not supported on this platform".into())
68 }
69 } else {
70 Ok(types::PluginClient::connect(addr.to_string()).await?)
72 }
73}
74
75pub struct Plugin {
76 id: String,
77 name: String,
78 version: String,
79 api_version: String,
80}
81
82impl Plugin {
83 pub fn new(id: &str, name: &str, version: &str, api_version: &str) -> Self {
84 Self {
85 id: id.to_string(),
86 name: name.to_string(),
87 version: version.to_string(),
88 api_version: api_version.to_string(),
89 }
90 }
91
92 pub async fn run(
94 self,
95 handler: impl PluginEventHandler + PluginSubscriptions + 'static,
96 addr: &str,
97 ) -> Result<(), Box<dyn Error>> {
98 let mut raw_client = connect_to_server(addr).await?;
99
100 let (tx, rx) = mpsc::channel(128);
101
102 let hello_msg = types::PluginToHost {
106 plugin_id: self.id.clone(),
107 payload: Some(types::PluginPayload::Hello(types::PluginHello {
108 name: self.name.clone(),
109 version: self.version.clone(),
110 api_version: self.api_version.clone(),
111 commands: vec![],
112 custom_items: vec![],
113 })),
114 };
115 tx.send(hello_msg).await?;
116
117 let request_stream = ReceiverStream::new(rx);
118 let mut event_stream = raw_client.event_stream(request_stream).await?.into_inner();
119
120 let server = Server {
121 plugin_id: self.id.clone(),
122 sender: tx.clone(),
123 };
124
125 let events = handler.get_subscriptions();
126 if !events.is_empty() {
127 println!("Subscribing to {} event types...", events.len());
128 server.subscribe(events).await?;
129 }
130
131 println!("Plugin '{}' connected and listening.", self.name);
132
133 while let Some(Ok(msg)) = event_stream.next().await {
135 match msg.payload {
136 Some(types::HostPayload::Event(envelope)) => {
138 event::dispatch_event(&server, &handler, &envelope).await;
139 }
140 Some(types::HostPayload::Shutdown(shutdown)) => {
142 println!("Server shutting down plugin: {}", shutdown.reason);
143 break; }
145 _ => { }
146 }
147 }
148
149 println!("Plugin '{}' disconnected.", self.name);
150 Ok(())
151 }
152}
153
154pub trait PluginSubscriptions {
160 fn get_subscriptions(&self) -> Vec<types::EventType>;
161}