cloudpub_client/plugins/
plugin_trait.rs1use crate::config::{ClientConfig, ClientOpts};
2use crate::shell::SubProcess;
3use anyhow::{bail, Context, Result};
4use async_trait::async_trait;
5use cloudpub_common::fair_channel::FairSender;
6use cloudpub_common::protocol::message::Message;
7use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ServerEndpoint};
8use cloudpub_common::utils::is_tcp_port_available;
9use parking_lot::RwLock;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::task::JoinHandle;
13use tokio::time::{self, Duration, Instant};
14use tracing::{debug, error};
15
16#[async_trait]
17pub trait Plugin: Send + Sync {
18 fn name(&self) -> &'static str;
20
21 async fn setup(
23 &self,
24 config: &Arc<RwLock<ClientConfig>>,
25 opts: &ClientOpts,
26 command_rx: &mut mpsc::Receiver<Message>,
27 result_tx: &mpsc::Sender<Message>,
28 ) -> Result<()>;
29
30 async fn publish(
32 &self,
33 endpoint: &ServerEndpoint,
34 config: &Arc<RwLock<ClientConfig>>,
35 opts: &ClientOpts,
36 result_tx: &mpsc::Sender<Message>,
37 ) -> Result<SubProcess>;
38}
39
40pub struct PluginHandle {
41 guid: String,
42 server: Arc<RwLock<Option<SubProcess>>>,
43 cancel_tx: mpsc::Sender<Message>,
44 task: JoinHandle<()>,
45}
46
47impl PluginHandle {
48 pub fn spawn(
49 plugin: Arc<dyn Plugin>,
50 mut endpoint: ServerEndpoint,
51 config: Arc<RwLock<ClientConfig>>,
52 opts: ClientOpts,
53 to_server_tx: FairSender<Message>,
54 ) -> Self {
55 let guid = endpoint.guid.clone();
56 let guid_for_task = guid.clone();
57 let server = Arc::new(RwLock::new(None));
58 let server_task = server.clone();
59
60 let (cancel_tx, mut cancel_rx) = mpsc::channel::<Message>(1);
62 let (proc_event_tx, mut proc_event_rx) = mpsc::channel::<Message>(1024);
63
64 let task = tokio::spawn(async move {
65 tokio::spawn({
67 let mut endpoint = endpoint.clone();
68 let to_server_tx = to_server_tx.clone();
69 let guid_for_task = guid_for_task.clone();
70 async move {
71 use tokio::time::{Duration, Instant};
72
73 let mut last_progress_time = Instant::now() - Duration::from_secs(1); while let Some(mut msg) = proc_event_rx.recv().await {
76 let should_send = match &mut msg {
77 Message::Progress(progress_info) => {
78 progress_info.guid = guid_for_task.clone();
79
80 let now = Instant::now();
81 if progress_info.current == 0
83 || progress_info.current >= progress_info.total
84 || now.duration_since(last_progress_time)
85 >= Duration::from_secs(1)
86 {
87 last_progress_time = now;
88 true
89 } else {
90 false
91 }
92 }
93 _ => true, };
95
96 if should_send {
97 to_server_tx.send(msg.clone()).await.ok();
98 }
99 }
100
101 endpoint.status = Some("offline".to_string());
102 to_server_tx
103 .send(Message::EndpointStatus(endpoint.clone()))
104 .await
105 .ok();
106 }
107 });
108
109 endpoint.status = Some("waiting".to_string());
111 let _ = to_server_tx
112 .send(Message::EndpointStatus(endpoint.clone()))
113 .await;
114
115 let res: Result<()> = async {
116 plugin
118 .setup(&config, &opts, &mut cancel_rx, &proc_event_tx)
119 .await
120 .context("Failed to setup plugin")?;
121
122 let server_process = plugin
123 .publish(&endpoint, &config, &opts, &proc_event_tx)
124 .await
125 .context("Failed to publish plugin service")?;
126
127 let now = Instant::now();
129 while is_tcp_port_available("127.0.0.1", server_process.port)
130 .await
131 .context("Check port availability")?
132 {
133 if server_process.result.read().is_err() {
134 return Ok(()); }
136
137 if now.elapsed() > Duration::from_secs(60) {
138 bail!("{}", crate::t!("error-start-server"));
139 }
140 debug!(
141 "Waiting for server to start on port {}",
142 server_process.port
143 );
144 time::sleep(Duration::from_secs(1)).await;
145 }
146
147 *server_task.write() = Some(server_process);
148 Ok(())
149 }
150 .await;
151
152 match res {
153 Ok(()) => {
154 endpoint.status = Some("online".into());
155 let _ = to_server_tx.send(Message::EndpointStatus(endpoint)).await;
156 }
157 Err(e) => {
158 error!("Error handling endpoint {}: {:#}", &guid_for_task, e);
159 let _ = to_server_tx
160 .send(Message::Error(ErrorInfo {
161 kind: ErrorKind::PublishFailed.into(),
162 message: e.to_string(),
163 guid: guid_for_task,
164 }))
165 .await;
166 }
167 }
168 });
169
170 Self {
171 guid,
172 server,
173 cancel_tx,
174 task,
175 }
176 }
177
178 pub fn port(&self) -> Option<u16> {
179 self.server.read().as_ref().map(|s| s.port)
180 }
181
182 pub fn guid(&self) -> &str {
183 &self.guid
184 }
185
186 pub fn send_break(&self) {
187 let _ = self.cancel_tx.try_send(Message::Break(Break {
188 guid: self.guid.clone(),
189 }));
190 }
191}
192
193impl Drop for PluginHandle {
194 fn drop(&mut self) {
195 self.task.abort();
197 let _ = self.cancel_tx.try_send(Message::Break(Break {
199 guid: self.guid.clone(),
200 }));
201 }
202}