cloudpub_client/plugins/
plugin_trait.rs

1use crate::config::{ClientConfig, ClientOpts};
2use crate::shell::SubProcess;
3use anyhow::{bail, Context, Result};
4use async_trait::async_trait;
5use cloudpub_common::fair_channel::FairSender;
6use cloudpub_common::protocol::message::Message;
7use cloudpub_common::protocol::{Break, ErrorInfo, ErrorKind, ServerEndpoint};
8use cloudpub_common::utils::is_tcp_port_available;
9use parking_lot::RwLock;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::task::JoinHandle;
13use tokio::time::{self, Duration, Instant};
14use tracing::{debug, error};
15
16#[async_trait]
17pub trait Plugin: Send + Sync {
18    /// Name of the plugin
19    fn name(&self) -> &'static str;
20
21    /// Setup the plugin environment
22    async fn setup(
23        &self,
24        config: &Arc<RwLock<ClientConfig>>,
25        opts: &ClientOpts,
26        command_rx: &mut mpsc::Receiver<Message>,
27        result_tx: &mpsc::Sender<Message>,
28    ) -> Result<()>;
29
30    /// Publish a service using this plugin
31    async fn publish(
32        &self,
33        endpoint: &ServerEndpoint,
34        config: &Arc<RwLock<ClientConfig>>,
35        opts: &ClientOpts,
36        result_tx: &mpsc::Sender<Message>,
37    ) -> Result<SubProcess>;
38}
39
40pub struct PluginHandle {
41    guid: String,
42    server: Arc<RwLock<Option<SubProcess>>>,
43    cancel_tx: mpsc::Sender<Message>,
44    task: JoinHandle<()>,
45}
46
47impl PluginHandle {
48    pub fn spawn(
49        plugin: Arc<dyn Plugin>,
50        mut endpoint: ServerEndpoint,
51        config: Arc<RwLock<ClientConfig>>,
52        opts: ClientOpts,
53        to_server_tx: FairSender<Message>,
54    ) -> Self {
55        let guid = endpoint.guid.clone();
56        let guid_for_task = guid.clone();
57        let server = Arc::new(RwLock::new(None));
58        let server_task = server.clone();
59
60        // Per-process cancel and event channels
61        let (cancel_tx, mut cancel_rx) = mpsc::channel::<Message>(1);
62        let (proc_event_tx, mut proc_event_rx) = mpsc::channel::<Message>(1024);
63
64        let task = tokio::spawn(async move {
65            // Forward per-process events to the main client event channel
66            tokio::spawn({
67                let mut endpoint = endpoint.clone();
68                let to_server_tx = to_server_tx.clone();
69                let guid_for_task = guid_for_task.clone();
70                async move {
71                    use tokio::time::{Duration, Instant};
72
73                    let mut last_progress_time = Instant::now() - Duration::from_secs(1); // Allow first progress immediately
74
75                    while let Some(mut msg) = proc_event_rx.recv().await {
76                        let should_send = match &mut msg {
77                            Message::Progress(progress_info) => {
78                                progress_info.guid = guid_for_task.clone();
79
80                                let now = Instant::now();
81                                // Always send 0% and 100% progress messages, or throttle to 1 per second
82                                if progress_info.current == 0
83                                    || progress_info.current >= progress_info.total
84                                    || now.duration_since(last_progress_time)
85                                        >= Duration::from_secs(1)
86                                {
87                                    last_progress_time = now;
88                                    true
89                                } else {
90                                    false
91                                }
92                            }
93                            _ => true, // Always send non-progress messages
94                        };
95
96                        if should_send {
97                            to_server_tx.send(msg.clone()).await.ok();
98                        }
99                    }
100
101                    endpoint.status = Some("offline".to_string());
102                    to_server_tx
103                        .send(Message::EndpointStatus(endpoint.clone()))
104                        .await
105                        .ok();
106                }
107            });
108
109            // Initial lifecycle status: waiting
110            endpoint.status = Some("waiting".to_string());
111            let _ = to_server_tx
112                .send(Message::EndpointStatus(endpoint.clone()))
113                .await;
114
115            let res: Result<()> = async {
116                // Long running setup: cancellable via cancel_rx
117                plugin
118                    .setup(&config, &opts, &mut cancel_rx, &proc_event_tx)
119                    .await
120                    .context("Failed to setup plugin")?;
121
122                let server_process = plugin
123                    .publish(&endpoint, &config, &opts, &proc_event_tx)
124                    .await
125                    .context("Failed to publish plugin service")?;
126
127                // Wait until the port is bound
128                let now = Instant::now();
129                while is_tcp_port_available("127.0.0.1", server_process.port)
130                    .await
131                    .context("Check port availability")?
132                {
133                    if server_process.result.read().is_err() {
134                        return Ok(()); // Error already reported by subprocess
135                    }
136
137                    if now.elapsed() > Duration::from_secs(60) {
138                        bail!("{}", crate::t!("error-start-server"));
139                    }
140                    debug!(
141                        "Waiting for server to start on port {}",
142                        server_process.port
143                    );
144                    time::sleep(Duration::from_secs(1)).await;
145                }
146
147                *server_task.write() = Some(server_process);
148                Ok(())
149            }
150            .await;
151
152            match res {
153                Ok(()) => {
154                    endpoint.status = Some("online".into());
155                    let _ = to_server_tx.send(Message::EndpointStatus(endpoint)).await;
156                }
157                Err(e) => {
158                    error!("Error handling endpoint {}: {:#}", &guid_for_task, e);
159                    let _ = to_server_tx
160                        .send(Message::Error(ErrorInfo {
161                            kind: ErrorKind::PublishFailed.into(),
162                            message: e.to_string(),
163                            guid: guid_for_task,
164                        }))
165                        .await;
166                }
167            }
168        });
169
170        Self {
171            guid,
172            server,
173            cancel_tx,
174            task,
175        }
176    }
177
178    pub fn port(&self) -> Option<u16> {
179        self.server.read().as_ref().map(|s| s.port)
180    }
181
182    pub fn guid(&self) -> &str {
183        &self.guid
184    }
185
186    pub fn send_break(&self) {
187        let _ = self.cancel_tx.try_send(Message::Break(Break {
188            guid: self.guid.clone(),
189        }));
190    }
191}
192
193impl Drop for PluginHandle {
194    fn drop(&mut self) {
195        // Abort main task and drop cancel_tx to notify setup to stop
196        self.task.abort();
197        // cancel_tx dropped here; receiver sees end-of-stream and should terminate
198        let _ = self.cancel_tx.try_send(Message::Break(Break {
199            guid: self.guid.clone(),
200        }));
201    }
202}