taiga_plugin_api/daemon/
traits.rs

1//! Generic daemon framework traits and runner
2//!
3//! Provides a structured way to implement daemon-based plugins with minimal boilerplate.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use taiga_plugin_api::daemon::traits::{DaemonHandler, DaemonConfig, run_daemon_loop};
9//! use async_trait::async_trait;
10//!
11//! struct MyDaemon {
12//!     counter: u32,
13//! }
14//!
15//! #[async_trait]
16//! impl DaemonHandler for MyDaemon {
17//!     type Command = MyCommand;
18//!     type Response = MyResponse;
19//!
20//!     async fn handle_command(&mut self, cmd: Self::Command) -> Self::Response {
21//!         match cmd {
22//!             MyCommand::Increment => {
23//!                 self.counter += 1;
24//!                 MyResponse::Ok(self.counter)
25//!             }
26//!         }
27//!     }
28//!
29//!     async fn on_tick(&mut self) {
30//!         // Called periodically
31//!     }
32//! }
33//!
34//! // In your daemon entry point:
35//! let config = DaemonConfig::new("/tmp/my-plugin.sock");
36//! run_daemon_loop(config, MyDaemon { counter: 0 }).await?;
37//! ```
38
39use super::ipc::{receive_message, send_message};
40use super::socket;
41use crate::PluginError;
42use async_trait::async_trait;
43use interprocess::local_socket::tokio::Stream as LocalSocketStream;
44use interprocess::local_socket::traits::tokio::Listener as _;
45use serde::{Deserialize, Serialize};
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::Mutex;
49use tokio::time;
50
51/// Configuration for running a daemon
52#[derive(Debug, Clone)]
53pub struct DaemonConfig {
54    /// Path to the socket file
55    pub socket_path: String,
56    /// Interval between tick calls (in seconds)
57    pub tick_interval_secs: u64,
58    /// Buffer size for IPC messages
59    pub buffer_size: usize,
60}
61
62impl DaemonConfig {
63    /// Create a new daemon configuration with the given socket path
64    pub fn new(socket_path: impl Into<String>) -> Self {
65        Self {
66            socket_path: socket_path.into(),
67            tick_interval_secs: 1,
68            buffer_size: 1024,
69        }
70    }
71
72    /// Set the tick interval
73    pub fn with_tick_interval(mut self, secs: u64) -> Self {
74        self.tick_interval_secs = secs;
75        self
76    }
77
78    /// Set the buffer size
79    pub fn with_buffer_size(mut self, size: usize) -> Self {
80        self.buffer_size = size;
81        self
82    }
83}
84
85/// Result of handling a command
86#[derive(Debug, Clone)]
87pub enum HandleResult<R> {
88    /// Send the response and continue
89    Response(R),
90    /// Send the response and shut down the daemon
91    Shutdown(R),
92}
93
94impl<R> HandleResult<R> {
95    /// Create a response that continues the daemon
96    pub fn response(r: R) -> Self {
97        HandleResult::Response(r)
98    }
99
100    /// Create a response that shuts down the daemon
101    pub fn shutdown(r: R) -> Self {
102        HandleResult::Shutdown(r)
103    }
104}
105
106/// Trait for implementing daemon command handlers
107///
108/// Implement this trait to define how your daemon handles commands and periodic ticks.
109#[async_trait]
110pub trait DaemonHandler: Send + Sync + 'static {
111    /// The command type this daemon accepts
112    type Command: for<'de> Deserialize<'de> + Send;
113
114    /// The response type this daemon returns
115    type Response: Serialize + Send + Sync;
116
117    /// Handle an incoming command
118    ///
119    /// This method is called whenever a client sends a command.
120    /// Return `HandleResult::Response` to continue running, or
121    /// `HandleResult::Shutdown` to stop the daemon after sending the response.
122    async fn handle_command(&mut self, cmd: Self::Command) -> HandleResult<Self::Response>;
123
124    /// Called periodically based on the tick interval
125    ///
126    /// Use this for background tasks like checking timers, cleaning up resources, etc.
127    /// The default implementation does nothing.
128    async fn on_tick(&mut self) {}
129
130    /// Called when the daemon starts, before accepting connections
131    ///
132    /// Use this for initialization that needs to happen after the socket is created.
133    /// The default implementation does nothing.
134    fn on_start(&mut self) {}
135
136    /// Called when the daemon is shutting down
137    ///
138    /// Use this for cleanup tasks.
139    /// The default implementation does nothing.
140    fn on_shutdown(&mut self) {}
141}
142
143/// Run the daemon event loop
144///
145/// This function:
146/// 1. Sets up the socket listener
147/// 2. Calls `on_start` on the handler
148/// 3. Runs the main event loop, handling:
149///    - Incoming client connections
150///    - Periodic tick events
151/// 4. Calls `on_shutdown` when the daemon exits
152///
153/// # Arguments
154/// * `config` - Daemon configuration (socket path, intervals, etc.)
155/// * `handler` - The daemon handler implementing the business logic
156///
157/// # Returns
158/// Returns an error if socket setup fails or a fatal error occurs.
159pub async fn run_daemon_loop<H>(
160    config: DaemonConfig,
161    handler: H,
162) -> Result<(), PluginError>
163where
164    H: DaemonHandler,
165{
166    // Clean up any existing socket
167    socket::cleanup_socket(&config.socket_path);
168
169    // Create listener
170    let listener = socket::create_listener(&config.socket_path)?;
171
172    println!("Daemon listening at: {}", config.socket_path);
173
174    // Wrap handler in Arc<Mutex> for shared access
175    let handler = Arc::new(Mutex::new(handler));
176
177    // Call on_start
178    {
179        let mut h = handler.lock().await;
180        h.on_start();
181    }
182
183    // Create tick interval
184    let mut interval = time::interval(Duration::from_secs(config.tick_interval_secs));
185
186    // Track if we should shut down
187    let should_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
188
189    loop {
190        // Check if we should shut down
191        if should_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
192            break;
193        }
194
195        tokio::select! {
196            _ = interval.tick() => {
197                let mut h = handler.lock().await;
198                h.on_tick().await;
199            }
200
201            result = listener.accept() => {
202                match result {
203                    Ok(mut stream) => {
204                        let handler_clone = handler.clone();
205                        let buffer_size = config.buffer_size;
206                        let shutdown_flag = should_shutdown.clone();
207
208                        tokio::spawn(async move {
209                            if let Err(e) = handle_connection(
210                                &mut stream,
211                                handler_clone,
212                                buffer_size,
213                                shutdown_flag,
214                            ).await {
215                                eprintln!("Error handling client: {}", e);
216                            }
217                        });
218                    }
219                    Err(e) => eprintln!("Connection error: {}", e),
220                }
221            }
222        }
223    }
224
225    // Call on_shutdown
226    {
227        let mut h = handler.lock().await;
228        h.on_shutdown();
229    }
230
231    // Clean up socket
232    socket::cleanup_socket(&config.socket_path);
233
234    Ok(())
235}
236
237async fn handle_connection<H>(
238    stream: &mut LocalSocketStream,
239    handler: Arc<Mutex<H>>,
240    buffer_size: usize,
241    shutdown_flag: Arc<std::sync::atomic::AtomicBool>,
242) -> Result<(), PluginError>
243where
244    H: DaemonHandler,
245{
246    // Receive command
247    let cmd: H::Command = match receive_message(stream, buffer_size).await {
248        Ok(cmd) => cmd,
249        Err(PluginError::IpcConnection { message, .. }) if message.contains("closed") => {
250            // Client disconnected without sending
251            return Ok(());
252        }
253        Err(e) => return Err(e),
254    };
255
256    // Handle command
257    let result = {
258        let mut h = handler.lock().await;
259        h.handle_command(cmd).await
260    };
261
262    // Send response and check for shutdown
263    match result {
264        HandleResult::Response(response) => {
265            send_message(stream, &response).await?;
266        }
267        HandleResult::Shutdown(response) => {
268            send_message(stream, &response).await?;
269            shutdown_flag.store(true, std::sync::atomic::Ordering::Relaxed);
270        }
271    }
272
273    Ok(())
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn test_daemon_config_creation() {
282        let config = DaemonConfig::new("/tmp/test.sock");
283        assert_eq!(config.socket_path, "/tmp/test.sock");
284        assert_eq!(config.tick_interval_secs, 1);
285        assert_eq!(config.buffer_size, 1024);
286    }
287
288    #[test]
289    fn test_daemon_config_with_options() {
290        let config = DaemonConfig::new("/tmp/test.sock")
291            .with_tick_interval(5)
292            .with_buffer_size(2048);
293        assert_eq!(config.tick_interval_secs, 5);
294        assert_eq!(config.buffer_size, 2048);
295    }
296}