taiga_plugin_api/daemon/traits.rs
1//! Generic daemon framework traits and runner
2//!
3//! Provides a structured way to implement daemon-based plugins with minimal boilerplate.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use taiga_plugin_api::daemon::traits::{DaemonHandler, DaemonConfig, run_daemon_loop};
9//! use async_trait::async_trait;
10//!
11//! struct MyDaemon {
12//! counter: u32,
13//! }
14//!
15//! #[async_trait]
16//! impl DaemonHandler for MyDaemon {
17//! type Command = MyCommand;
18//! type Response = MyResponse;
19//!
20//! async fn handle_command(&mut self, cmd: Self::Command) -> Self::Response {
21//! match cmd {
22//! MyCommand::Increment => {
23//! self.counter += 1;
24//! MyResponse::Ok(self.counter)
25//! }
26//! }
27//! }
28//!
29//! async fn on_tick(&mut self) {
30//! // Called periodically
31//! }
32//! }
33//!
34//! // In your daemon entry point:
35//! let config = DaemonConfig::new("/tmp/my-plugin.sock");
36//! run_daemon_loop(config, MyDaemon { counter: 0 }).await?;
37//! ```
38
39use super::ipc::{receive_message, send_message};
40use super::socket;
41use crate::PluginError;
42use async_trait::async_trait;
43use interprocess::local_socket::tokio::Stream as LocalSocketStream;
44use interprocess::local_socket::traits::tokio::Listener as _;
45use serde::{Deserialize, Serialize};
46use std::sync::Arc;
47use std::time::Duration;
48use tokio::sync::Mutex;
49use tokio::time;
50
51/// Configuration for running a daemon
52#[derive(Debug, Clone)]
53pub struct DaemonConfig {
54 /// Path to the socket file
55 pub socket_path: String,
56 /// Interval between tick calls (in seconds)
57 pub tick_interval_secs: u64,
58 /// Buffer size for IPC messages
59 pub buffer_size: usize,
60}
61
62impl DaemonConfig {
63 /// Create a new daemon configuration with the given socket path
64 pub fn new(socket_path: impl Into<String>) -> Self {
65 Self {
66 socket_path: socket_path.into(),
67 tick_interval_secs: 1,
68 buffer_size: 1024,
69 }
70 }
71
72 /// Set the tick interval
73 pub fn with_tick_interval(mut self, secs: u64) -> Self {
74 self.tick_interval_secs = secs;
75 self
76 }
77
78 /// Set the buffer size
79 pub fn with_buffer_size(mut self, size: usize) -> Self {
80 self.buffer_size = size;
81 self
82 }
83}
84
85/// Result of handling a command
86#[derive(Debug, Clone)]
87pub enum HandleResult<R> {
88 /// Send the response and continue
89 Response(R),
90 /// Send the response and shut down the daemon
91 Shutdown(R),
92}
93
94impl<R> HandleResult<R> {
95 /// Create a response that continues the daemon
96 pub fn response(r: R) -> Self {
97 HandleResult::Response(r)
98 }
99
100 /// Create a response that shuts down the daemon
101 pub fn shutdown(r: R) -> Self {
102 HandleResult::Shutdown(r)
103 }
104}
105
106/// Trait for implementing daemon command handlers
107///
108/// Implement this trait to define how your daemon handles commands and periodic ticks.
109#[async_trait]
110pub trait DaemonHandler: Send + Sync + 'static {
111 /// The command type this daemon accepts
112 type Command: for<'de> Deserialize<'de> + Send;
113
114 /// The response type this daemon returns
115 type Response: Serialize + Send + Sync;
116
117 /// Handle an incoming command
118 ///
119 /// This method is called whenever a client sends a command.
120 /// Return `HandleResult::Response` to continue running, or
121 /// `HandleResult::Shutdown` to stop the daemon after sending the response.
122 async fn handle_command(&mut self, cmd: Self::Command) -> HandleResult<Self::Response>;
123
124 /// Called periodically based on the tick interval
125 ///
126 /// Use this for background tasks like checking timers, cleaning up resources, etc.
127 /// The default implementation does nothing.
128 async fn on_tick(&mut self) {}
129
130 /// Called when the daemon starts, before accepting connections
131 ///
132 /// Use this for initialization that needs to happen after the socket is created.
133 /// The default implementation does nothing.
134 fn on_start(&mut self) {}
135
136 /// Called when the daemon is shutting down
137 ///
138 /// Use this for cleanup tasks.
139 /// The default implementation does nothing.
140 fn on_shutdown(&mut self) {}
141}
142
143/// Run the daemon event loop
144///
145/// This function:
146/// 1. Sets up the socket listener
147/// 2. Calls `on_start` on the handler
148/// 3. Runs the main event loop, handling:
149/// - Incoming client connections
150/// - Periodic tick events
151/// 4. Calls `on_shutdown` when the daemon exits
152///
153/// # Arguments
154/// * `config` - Daemon configuration (socket path, intervals, etc.)
155/// * `handler` - The daemon handler implementing the business logic
156///
157/// # Returns
158/// Returns an error if socket setup fails or a fatal error occurs.
159pub async fn run_daemon_loop<H>(
160 config: DaemonConfig,
161 handler: H,
162) -> Result<(), PluginError>
163where
164 H: DaemonHandler,
165{
166 // Clean up any existing socket
167 socket::cleanup_socket(&config.socket_path);
168
169 // Create listener
170 let listener = socket::create_listener(&config.socket_path)?;
171
172 println!("Daemon listening at: {}", config.socket_path);
173
174 // Wrap handler in Arc<Mutex> for shared access
175 let handler = Arc::new(Mutex::new(handler));
176
177 // Call on_start
178 {
179 let mut h = handler.lock().await;
180 h.on_start();
181 }
182
183 // Create tick interval
184 let mut interval = time::interval(Duration::from_secs(config.tick_interval_secs));
185
186 // Track if we should shut down
187 let should_shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
188
189 loop {
190 // Check if we should shut down
191 if should_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
192 break;
193 }
194
195 tokio::select! {
196 _ = interval.tick() => {
197 let mut h = handler.lock().await;
198 h.on_tick().await;
199 }
200
201 result = listener.accept() => {
202 match result {
203 Ok(mut stream) => {
204 let handler_clone = handler.clone();
205 let buffer_size = config.buffer_size;
206 let shutdown_flag = should_shutdown.clone();
207
208 tokio::spawn(async move {
209 if let Err(e) = handle_connection(
210 &mut stream,
211 handler_clone,
212 buffer_size,
213 shutdown_flag,
214 ).await {
215 eprintln!("Error handling client: {}", e);
216 }
217 });
218 }
219 Err(e) => eprintln!("Connection error: {}", e),
220 }
221 }
222 }
223 }
224
225 // Call on_shutdown
226 {
227 let mut h = handler.lock().await;
228 h.on_shutdown();
229 }
230
231 // Clean up socket
232 socket::cleanup_socket(&config.socket_path);
233
234 Ok(())
235}
236
237async fn handle_connection<H>(
238 stream: &mut LocalSocketStream,
239 handler: Arc<Mutex<H>>,
240 buffer_size: usize,
241 shutdown_flag: Arc<std::sync::atomic::AtomicBool>,
242) -> Result<(), PluginError>
243where
244 H: DaemonHandler,
245{
246 // Receive command
247 let cmd: H::Command = match receive_message(stream, buffer_size).await {
248 Ok(cmd) => cmd,
249 Err(PluginError::IpcConnection { message, .. }) if message.contains("closed") => {
250 // Client disconnected without sending
251 return Ok(());
252 }
253 Err(e) => return Err(e),
254 };
255
256 // Handle command
257 let result = {
258 let mut h = handler.lock().await;
259 h.handle_command(cmd).await
260 };
261
262 // Send response and check for shutdown
263 match result {
264 HandleResult::Response(response) => {
265 send_message(stream, &response).await?;
266 }
267 HandleResult::Shutdown(response) => {
268 send_message(stream, &response).await?;
269 shutdown_flag.store(true, std::sync::atomic::Ordering::Relaxed);
270 }
271 }
272
273 Ok(())
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 #[test]
281 fn test_daemon_config_creation() {
282 let config = DaemonConfig::new("/tmp/test.sock");
283 assert_eq!(config.socket_path, "/tmp/test.sock");
284 assert_eq!(config.tick_interval_secs, 1);
285 assert_eq!(config.buffer_size, 1024);
286 }
287
288 #[test]
289 fn test_daemon_config_with_options() {
290 let config = DaemonConfig::new("/tmp/test.sock")
291 .with_tick_interval(5)
292 .with_buffer_size(2048);
293 assert_eq!(config.tick_interval_secs, 5);
294 assert_eq!(config.buffer_size, 2048);
295 }
296}