1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::time::Duration;
5
6use chrono::Utc;
7use tokio::sync::{broadcast, Mutex, RwLock};
8use tracing::{error, info, warn};
9
10use crate::daemon::{DaemonInstance, DaemonSpec, HealthStatus};
11use crate::error::{Result, SyspulseError};
12use crate::ipc::protocol::{Request, Response};
13use crate::ipc::server::IpcServer;
14use crate::lifecycle::LifecycleState;
15use crate::logs::LogManager;
16use crate::paths;
17use crate::process::{self, ProcessDriver};
18use crate::registry::Registry;
19use crate::restart::RestartEvaluator;
20use crate::scheduler::Scheduler;
21
22pub struct DaemonManager {
23 registry: Arc<Mutex<Registry>>,
24 process_driver: Arc<dyn ProcessDriver>,
25 log_manager: Arc<LogManager>,
26 instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
27 health_handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
28 shutdown_tx: broadcast::Sender<()>,
29}
30
31impl DaemonManager {
32 pub fn new(data_dir: Option<PathBuf>) -> Result<Self> {
34 let data = data_dir.unwrap_or_else(paths::data_dir);
35 paths::ensure_dirs()?;
36
37 let db_path = data.join("syspulse.db");
38 let registry = Registry::new(&db_path)?;
39 let process_driver = process::create_driver();
40 let log_manager = LogManager::new(&data);
41 let (shutdown_tx, _) = broadcast::channel(16);
42
43 let saved_states = registry.list_states().unwrap_or_default();
45 let mut instances = HashMap::new();
46 for inst in saved_states {
47 instances.insert(inst.spec_name.clone(), inst);
48 }
49
50 Ok(Self {
51 registry: Arc::new(Mutex::new(registry)),
52 process_driver: Arc::from(process_driver),
53 log_manager: Arc::new(log_manager),
54 instances: Arc::new(RwLock::new(instances)),
55 health_handles: Arc::new(Mutex::new(HashMap::new())),
56 shutdown_tx,
57 })
58 }
59
60 pub async fn start_daemon(&self, name: &str) -> Result<DaemonInstance> {
62 let spec = {
63 let reg = self.registry.lock().await;
64 reg.get_spec(name)?
65 };
66
67 let mut instances = self.instances.write().await;
69 let instance = instances
70 .entry(name.to_string())
71 .or_insert_with(|| DaemonInstance::new(name));
72
73 let new_state = instance.state.transition_to(LifecycleState::Starting)?;
75 instance.state = new_state;
76
77 let (stdout_path, stderr_path) = self.log_manager.setup_log_files(name)?;
79 instance.stdout_log = Some(stdout_path.clone());
80 instance.stderr_log = Some(stderr_path.clone());
81
82 let proc_info = self
84 .process_driver
85 .spawn(&spec, &stdout_path, &stderr_path)
86 .await?;
87
88 instance.pid = Some(proc_info.pid);
89 instance.started_at = Some(Utc::now());
90 instance.stopped_at = None;
91 instance.exit_code = None;
92 instance.state = instance.state.transition_to(LifecycleState::Running)?;
93
94 if spec.health_check.is_some() {
95 instance.health_status = HealthStatus::Unknown;
96 } else {
97 instance.health_status = HealthStatus::NotConfigured;
98 }
99
100 {
102 let reg = self.registry.lock().await;
103 reg.update_state(instance)?;
104 }
105
106 let result = instance.clone();
107
108 drop(instances);
110
111 if let Some(ref health_spec) = spec.health_check {
113 let daemon_name = name.to_string();
114 let shutdown_rx = self.shutdown_tx.subscribe();
115 let instances = Arc::clone(&self.instances);
116 let registry = Arc::clone(&self.registry);
117 let health_spec: crate::daemon::HealthCheckSpec = health_spec.clone();
118
119 let handle = tokio::spawn(async move {
120 Self::run_health_check(instances, registry, daemon_name, health_spec, shutdown_rx)
121 .await;
122 });
123
124 let mut handles = self.health_handles.lock().await;
125 handles.insert(name.to_string(), handle);
126 }
127
128 info!(
129 "Started daemon '{}' with PID {}",
130 name,
131 result.pid.unwrap_or(0)
132 );
133 Ok(result)
134 }
135
136 pub async fn stop_daemon(&self, name: &str, force: bool) -> Result<DaemonInstance> {
138 let mut instances = self.instances.write().await;
139 let instance = instances
140 .get_mut(name)
141 .ok_or_else(|| SyspulseError::DaemonNotFound(name.to_string()))?;
142
143 if !instance.state.is_active() {
144 return Err(SyspulseError::InvalidStateTransition {
145 from: format!("{:?}", instance.state),
146 to: "Stopping".to_string(),
147 });
148 }
149
150 instance.state = instance.state.transition_to(LifecycleState::Stopping)?;
151
152 {
154 let mut handles = self.health_handles.lock().await;
155 if let Some(handle) = handles.remove(name) {
156 handle.abort();
157 }
158 }
159
160 if let Some(pid) = instance.pid {
162 if force {
163 self.process_driver.kill(pid).await?;
164 } else {
165 let timeout = {
166 let reg = self.registry.lock().await;
167 reg.get_spec(name)
168 .map(|s| s.stop_timeout_secs)
169 .unwrap_or(30)
170 };
171 self.process_driver.stop(pid, timeout).await?;
172 }
173 let exit_code = self.process_driver.wait(pid).await.ok().flatten();
175 instance.exit_code = exit_code;
176 }
177
178 instance.state = instance.state.transition_to(LifecycleState::Stopped)?;
179 instance.stopped_at = Some(Utc::now());
180 instance.pid = None;
181 instance.health_status = HealthStatus::Unknown;
182
183 {
185 let reg = self.registry.lock().await;
186 reg.update_state(instance)?;
187 }
188
189 info!("Stopped daemon '{}'", name);
190 Ok(instance.clone())
191 }
192
193 pub async fn restart_daemon(&self, name: &str, force: bool) -> Result<DaemonInstance> {
195 {
197 let instances = self.instances.read().await;
198 if let Some(inst) = instances.get(name) {
199 if inst.state.is_active() {
200 drop(instances);
201 self.stop_daemon(name, force).await?;
202 }
203 }
204 }
205 self.start_daemon(name).await
206 }
207
208 pub async fn status(&self, name: &str) -> Result<DaemonInstance> {
210 let instances = self.instances.read().await;
211 instances
212 .get(name)
213 .cloned()
214 .ok_or_else(|| SyspulseError::DaemonNotFound(name.to_string()))
215 }
216
217 pub async fn list(&self) -> Result<Vec<DaemonInstance>> {
219 let instances = self.instances.read().await;
220 Ok(instances.values().cloned().collect())
221 }
222
223 pub async fn add_daemon(&self, spec: DaemonSpec) -> Result<()> {
225 let name = spec.name.clone();
226
227 {
228 let reg = self.registry.lock().await;
229 reg.register(&spec)?;
230 }
231
232 let mut instance = DaemonInstance::new(&name);
234 if spec.schedule.is_some() {
235 instance.state = LifecycleState::Scheduled;
236 }
237
238 let mut instances = self.instances.write().await;
239 instances.insert(name.clone(), instance);
240
241 info!("Added daemon '{}'", name);
242 Ok(())
243 }
244
245 pub async fn remove_daemon(&self, name: &str, force: bool) -> Result<()> {
247 {
249 let instances = self.instances.read().await;
250 if let Some(inst) = instances.get(name) {
251 if inst.state.is_active() {
252 if !force {
253 return Err(SyspulseError::Process(format!(
254 "Daemon '{}' is still running. Use force to stop and remove.",
255 name
256 )));
257 }
258 }
259 }
260 }
261
262 if force {
264 let instances = self.instances.read().await;
265 let is_active = instances
266 .get(name)
267 .map(|i| i.state.is_active())
268 .unwrap_or(false);
269 drop(instances);
270 if is_active {
271 self.stop_daemon(name, true).await?;
272 }
273 }
274
275 {
277 let reg = self.registry.lock().await;
278 reg.unregister(name)?;
279 }
280
281 {
283 let mut instances = self.instances.write().await;
284 instances.remove(name);
285 }
286
287 {
289 let mut handles = self.health_handles.lock().await;
290 if let Some(handle) = handles.remove(name) {
291 handle.abort();
292 }
293 }
294
295 info!("Removed daemon '{}'", name);
296 Ok(())
297 }
298
299 pub async fn get_logs(&self, name: &str, lines: usize, stderr: bool) -> Result<Vec<String>> {
301 {
303 let instances = self.instances.read().await;
304 if !instances.contains_key(name) {
305 return Err(SyspulseError::DaemonNotFound(name.to_string()));
306 }
307 }
308 self.log_manager.read_logs(name, lines, stderr)
309 }
310
311 pub async fn handle_request(self: &Arc<Self>, request: Request) -> Response {
313 match request {
314 Request::Start { name, .. } => match self.start_daemon(&name).await {
315 Ok(inst) => Response::Ok {
316 message: format!("Daemon '{}' started (PID {})", name, inst.pid.unwrap_or(0)),
317 },
318 Err(e) => error_response(e),
319 },
320 Request::Stop { name, force, .. } => match self.stop_daemon(&name, force).await {
321 Ok(_) => Response::Ok {
322 message: format!("Daemon '{}' stopped", name),
323 },
324 Err(e) => error_response(e),
325 },
326 Request::Restart { name, force, .. } => match self.restart_daemon(&name, force).await {
327 Ok(inst) => Response::Ok {
328 message: format!(
329 "Daemon '{}' restarted (PID {})",
330 name,
331 inst.pid.unwrap_or(0)
332 ),
333 },
334 Err(e) => error_response(e),
335 },
336 Request::Status { name } => match name {
337 Some(name) => match self.status(&name).await {
338 Ok(instance) => Response::Status { instance },
339 Err(e) => error_response(e),
340 },
341 None => match self.list().await {
342 Ok(instances) => Response::List { instances },
343 Err(e) => error_response(e),
344 },
345 },
346 Request::List => match self.list().await {
347 Ok(instances) => Response::List { instances },
348 Err(e) => error_response(e),
349 },
350 Request::Logs {
351 name,
352 lines,
353 stderr,
354 } => match self.get_logs(&name, lines, stderr).await {
355 Ok(log_lines) => Response::Logs { lines: log_lines },
356 Err(e) => error_response(e),
357 },
358 Request::Add { spec } => match self.add_daemon(spec).await {
359 Ok(()) => Response::Ok {
360 message: "Daemon added".to_string(),
361 },
362 Err(e) => error_response(e),
363 },
364 Request::Remove { name, force } => match self.remove_daemon(&name, force).await {
365 Ok(()) => Response::Ok {
366 message: format!("Daemon '{}' removed", name),
367 },
368 Err(e) => error_response(e),
369 },
370 Request::Shutdown => {
371 info!("Shutdown requested via IPC");
372 let _ = self.shutdown_tx.send(());
375 Response::Ok {
376 message: "Shutting down".to_string(),
377 }
378 }
379 Request::Ping => Response::Pong,
380 }
381 }
382
383 pub async fn run(self: Arc<Self>) -> Result<()> {
385 info!("Starting syspulse daemon manager");
386
387 let pid_path = paths::pid_path();
389 std::fs::write(&pid_path, std::process::id().to_string())?;
390
391 self.restore_running_daemons().await;
393
394 let mut scheduler = Scheduler::new().await?;
396 self.setup_scheduled_daemons(&mut scheduler).await?;
397 scheduler.start().await?;
398
399 let socket_path = paths::socket_path();
401 let ipc_server = IpcServer::new(socket_path);
402 let shutdown_rx_ipc = self.shutdown_tx.subscribe();
403
404 let manager_for_ipc = Arc::clone(&self);
405 let ipc_handle = tokio::spawn(async move {
406 let handler = Arc::new(move |req: Request| {
407 let mgr = Arc::clone(&manager_for_ipc);
408 async move { mgr.handle_request(req).await }
409 });
410 if let Err(e) = ipc_server.run(handler, shutdown_rx_ipc).await {
411 error!("IPC server error: {}", e);
412 }
413 });
414
415 let manager_for_monitor = Arc::clone(&self);
417 let shutdown_rx_monitor = self.shutdown_tx.subscribe();
418 let monitor_handle = tokio::spawn(async move {
419 Self::monitor_processes(manager_for_monitor, shutdown_rx_monitor).await;
420 });
421
422 let shutdown_tx = self.shutdown_tx.clone();
424 tokio::select! {
425 _ = tokio::signal::ctrl_c() => {
426 info!("Received Ctrl+C, initiating shutdown");
427 let _ = shutdown_tx.send(());
428 }
429 _ = self.wait_for_shutdown() => {
430 info!("Shutdown signal received");
431 }
432 }
433
434 info!("Stopping all running daemons...");
436 self.stop_all_daemons().await;
437
438 scheduler.shutdown().await.ok();
440
441 ipc_handle.abort();
443 monitor_handle.abort();
444 let _ = tokio::join!(ipc_handle, monitor_handle);
445
446 std::fs::remove_file(&pid_path).ok();
448
449 info!("Daemon manager shut down cleanly");
450 Ok(())
451 }
452
453 async fn wait_for_shutdown(&self) {
455 let mut rx = self.shutdown_tx.subscribe();
456 let _ = rx.recv().await;
457 }
458
459 async fn restore_running_daemons(&self) {
461 let instances = self.instances.read().await;
462 let to_restart: Vec<String> = instances
463 .iter()
464 .filter(|(_, inst)| inst.state == LifecycleState::Running)
465 .map(|(name, _)| name.clone())
466 .collect();
467 drop(instances);
468
469 for name in to_restart {
470 {
472 let mut instances = self.instances.write().await;
473 if let Some(inst) = instances.get_mut(&name) {
474 inst.state = LifecycleState::Stopped;
475 inst.pid = None;
476 }
477 }
478 info!("Restoring previously running daemon '{}'", name);
479 if let Err(e) = self.start_daemon(&name).await {
480 error!("Failed to restore daemon '{}': {}", name, e);
481 }
482 }
483 }
484
485 async fn setup_scheduled_daemons(&self, scheduler: &mut Scheduler) -> Result<()> {
487 let specs = {
488 let reg = self.registry.lock().await;
489 reg.list_specs().unwrap_or_default()
490 };
491
492 for spec in specs {
493 if let Some(ref cron_expr) = spec.schedule {
494 let manager = Arc::new({
495 let instances = Arc::clone(&self.instances);
498 let registry = Arc::clone(&self.registry);
499 let process_driver = Arc::clone(&self.process_driver);
500 let log_manager = Arc::clone(&self.log_manager);
501 let shutdown_tx = self.shutdown_tx.clone();
502 let health_handles = Arc::clone(&self.health_handles);
503 ManagerComponents {
504 instances,
505 registry,
506 process_driver,
507 log_manager,
508 shutdown_tx,
509 health_handles,
510 }
511 });
512
513 scheduler
514 .schedule_daemon(&spec.name, cron_expr, move |name| {
515 let mgr = Arc::clone(&manager);
516 async move {
517 info!("Cron trigger: starting daemon '{}'", name);
518 if let Err(e) = cron_start_daemon(&mgr, &name).await {
519 error!("Cron failed to start '{}': {}", name, e);
520 }
521 }
522 })
523 .await?;
524 }
525 }
526
527 Ok(())
528 }
529
530 async fn stop_all_daemons(&self) {
532 let names: Vec<String> = {
533 let instances = self.instances.read().await;
534 instances
535 .iter()
536 .filter(|(_, inst)| inst.state.is_active())
537 .map(|(name, _)| name.clone())
538 .collect()
539 };
540
541 for name in names {
542 if let Err(e) = self.stop_daemon(&name, false).await {
543 warn!("Failed to stop daemon '{}' during shutdown: {}", name, e);
544 if let Err(e2) = self.stop_daemon(&name, true).await {
546 error!("Failed to force-stop daemon '{}': {}", name, e2);
547 }
548 }
549 }
550 }
551
552 async fn monitor_processes(
555 manager: Arc<DaemonManager>,
556 mut shutdown_rx: broadcast::Receiver<()>,
557 ) {
558 let mut interval = tokio::time::interval(Duration::from_secs(1));
559
560 loop {
561 tokio::select! {
562 _ = interval.tick() => {}
563 _ = shutdown_rx.recv() => {
564 info!("Process monitor shutting down");
565 break;
566 }
567 }
568
569 let running: Vec<(String, u32)> = {
571 let instances = manager.instances.read().await;
572 instances
573 .iter()
574 .filter_map(|(name, inst)| {
575 if inst.state == LifecycleState::Running {
576 inst.pid.map(|pid| (name.clone(), pid))
577 } else {
578 None
579 }
580 })
581 .collect()
582 };
583
584 for (name, pid) in running {
585 let alive = manager.process_driver.is_alive(pid).await;
586 if alive {
587 continue;
588 }
589
590 warn!("Daemon '{}' (PID {}) has exited unexpectedly", name, pid);
592
593 let exit_code = manager.process_driver.wait(pid).await.ok().flatten();
594
595 let (should_restart, backoff) = {
597 let mut instances = manager.instances.write().await;
598 if let Some(inst) = instances.get_mut(&name) {
599 inst.state = LifecycleState::Failed;
600 inst.pid = None;
601 inst.exit_code = exit_code;
602 inst.stopped_at = Some(Utc::now());
603 inst.health_status = HealthStatus::Unknown;
604
605 if let Ok(reg) = manager.registry.try_lock() {
607 reg.update_state(inst).ok();
608 }
609
610 let spec = {
612 if let Ok(reg) = manager.registry.try_lock() {
613 reg.get_spec(&name).ok()
614 } else {
615 None
616 }
617 };
618
619 if let Some(spec) = spec {
620 let should = RestartEvaluator::should_restart(
621 &spec.restart_policy,
622 exit_code,
623 inst.restart_count,
624 );
625 let backoff = RestartEvaluator::backoff_duration(
626 &spec.restart_policy,
627 inst.restart_count,
628 );
629 inst.restart_count += 1;
630 (should, backoff)
631 } else {
632 (false, Duration::ZERO)
633 }
634 } else {
635 (false, Duration::ZERO)
636 }
637 };
638
639 {
641 let mut handles = manager.health_handles.lock().await;
642 if let Some(handle) = handles.remove(&name) {
643 handle.abort();
644 }
645 }
646
647 if should_restart {
648 info!("Restarting daemon '{}' after {:?} backoff", name, backoff);
649
650 let mgr = Arc::clone(&manager);
651 let daemon_name = name.clone();
652 tokio::spawn(async move {
653 tokio::time::sleep(backoff).await;
654 {
656 let mut instances = mgr.instances.write().await;
657 if let Some(inst) = instances.get_mut(&daemon_name) {
658 inst.state = LifecycleState::Stopped;
659 }
660 }
661 if let Err(e) = mgr.start_daemon(&daemon_name).await {
662 error!("Failed to restart daemon '{}': {}", daemon_name, e);
663 }
664 });
665 }
666 }
667 }
668 }
669
670 async fn run_health_check(
672 instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
673 registry: Arc<Mutex<Registry>>,
674 daemon_name: String,
675 health_spec: crate::daemon::HealthCheckSpec,
676 mut shutdown_rx: broadcast::Receiver<()>,
677 ) {
678 use crate::health;
679
680 if health_spec.start_period_secs > 0 {
682 tokio::select! {
683 _ = tokio::time::sleep(Duration::from_secs(health_spec.start_period_secs)) => {}
684 _ = shutdown_rx.recv() => return,
685 }
686 }
687
688 let checker = health::create_checker(health_spec.clone());
689 let interval = Duration::from_secs(health_spec.interval_secs);
690 let max_failures = health_spec.retries;
691 let mut consecutive_failures: u32 = 0;
692
693 loop {
694 tokio::select! {
695 _ = tokio::time::sleep(interval) => {}
696 _ = shutdown_rx.recv() => break,
697 }
698
699 let result = checker.check().await;
700 let status = match result {
701 Ok(s) => s,
702 Err(e) => {
703 warn!("Health check error for '{}': {}", daemon_name, e);
704 HealthStatus::Unhealthy
705 }
706 };
707
708 match status {
709 HealthStatus::Healthy => {
710 consecutive_failures = 0;
711 let mut insts = instances.write().await;
712 if let Some(inst) = insts.get_mut(&daemon_name) {
713 if inst.state == LifecycleState::Running {
714 inst.health_status = HealthStatus::Healthy;
715 }
716 }
717 }
718 HealthStatus::Unhealthy => {
719 consecutive_failures += 1;
720 if consecutive_failures >= max_failures {
721 warn!(
722 "Daemon '{}' is unhealthy after {} consecutive failures",
723 daemon_name, consecutive_failures
724 );
725 let mut insts = instances.write().await;
726 if let Some(inst) = insts.get_mut(&daemon_name) {
727 inst.health_status = HealthStatus::Unhealthy;
728 if let Ok(reg) = registry.try_lock() {
729 reg.update_state(inst).ok();
730 }
731 }
732 }
733 }
734 _ => {}
735 }
736 }
737 }
738}
739
740struct ManagerComponents {
743 instances: Arc<RwLock<HashMap<String, DaemonInstance>>>,
744 registry: Arc<Mutex<Registry>>,
745 process_driver: Arc<dyn ProcessDriver>,
746 log_manager: Arc<LogManager>,
747 shutdown_tx: broadcast::Sender<()>,
748 health_handles: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
749}
750
751async fn cron_start_daemon(components: &ManagerComponents, name: &str) -> Result<DaemonInstance> {
753 let spec = {
754 let reg = components.registry.lock().await;
755 reg.get_spec(name)?
756 };
757
758 let mut instances = components.instances.write().await;
759 let instance = instances
760 .entry(name.to_string())
761 .or_insert_with(|| DaemonInstance::new(name));
762
763 if instance.state == LifecycleState::Scheduled || instance.state == LifecycleState::Stopped {
765 instance.state = instance.state.transition_to(LifecycleState::Starting)?;
766 } else if instance.state == LifecycleState::Running {
767 return Ok(instance.clone());
769 } else {
770 return Err(SyspulseError::InvalidStateTransition {
771 from: format!("{:?}", instance.state),
772 to: "Starting".to_string(),
773 });
774 }
775
776 let (stdout_path, stderr_path) = components.log_manager.setup_log_files(name)?;
777 instance.stdout_log = Some(stdout_path.clone());
778 instance.stderr_log = Some(stderr_path.clone());
779
780 let proc_info = components
781 .process_driver
782 .spawn(&spec, &stdout_path, &stderr_path)
783 .await?;
784
785 instance.pid = Some(proc_info.pid);
786 instance.started_at = Some(Utc::now());
787 instance.stopped_at = None;
788 instance.exit_code = None;
789 instance.state = instance.state.transition_to(LifecycleState::Running)?;
790
791 if spec.health_check.is_some() {
792 instance.health_status = HealthStatus::Unknown;
793 } else {
794 instance.health_status = HealthStatus::NotConfigured;
795 }
796
797 {
798 let reg = components.registry.lock().await;
799 reg.update_state(instance)?;
800 }
801
802 let result = instance.clone();
803 drop(instances);
804
805 if let Some(ref health_spec) = spec.health_check {
807 let daemon_name = name.to_string();
808 let shutdown_rx = components.shutdown_tx.subscribe();
809 let insts = Arc::clone(&components.instances);
810 let registry = Arc::clone(&components.registry);
811 let hs: crate::daemon::HealthCheckSpec = health_spec.clone();
812
813 let handle = tokio::spawn(async move {
814 DaemonManager::run_health_check(insts, registry, daemon_name, hs, shutdown_rx).await;
815 });
816
817 let mut handles = components.health_handles.lock().await;
818 handles.insert(name.to_string(), handle);
819 }
820
821 info!(
822 "Cron-started daemon '{}' with PID {}",
823 name,
824 result.pid.unwrap_or(0)
825 );
826 Ok(result)
827}
828
829fn error_response(e: SyspulseError) -> Response {
830 let code = match &e {
831 SyspulseError::DaemonNotFound(_) => 404,
832 SyspulseError::DaemonAlreadyExists(_) => 409,
833 SyspulseError::InvalidStateTransition { .. } => 409,
834 SyspulseError::Process(_) => 500,
835 SyspulseError::HealthCheck(_) => 500,
836 SyspulseError::Ipc(_) => 500,
837 SyspulseError::Registry(_) => 500,
838 SyspulseError::Config(_) => 400,
839 SyspulseError::Scheduler(_) => 500,
840 SyspulseError::Io(_) => 500,
841 SyspulseError::Serialization(_) => 400,
842 SyspulseError::Database(_) => 500,
843 SyspulseError::Timeout(_) => 504,
844 };
845 Response::Error {
846 code,
847 message: e.to_string(),
848 }
849}