Skip to main content

pingora_core/services/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The service interface
16//!
17//! A service to the pingora server is just something runs forever until the server is shutting
18//! down.
19//!
20//! Two types of services are particularly useful
21//! - services that are listening to some (TCP) endpoints
22//! - services that are just running in the background.
23
24use async_trait::async_trait;
25use daggy::Walker;
26use daggy::{petgraph::visit::Topo, Dag, NodeIndex};
27use log::{error, info, warn};
28use parking_lot::Mutex;
29use std::borrow::Borrow;
30use std::sync::Arc;
31use std::sync::Weak;
32use std::time::Duration;
33use tokio::sync::watch;
34
35#[cfg(unix)]
36use crate::server::ListenFds;
37use crate::server::ShutdownWatch;
38
39pub mod background;
40pub mod listening;
41
42/// A notification channel for signaling when a service has become ready.
43///
44/// Services can use this to notify other services that may depend on them
45/// that they have successfully started and are ready to serve requests.
46///
47/// # Example
48///
49/// ```rust,ignore
50/// use pingora_core::services::ServiceReadyNotifier;
51///
52/// async fn my_service(ready_notifier: ServiceReadyNotifier) {
53///     // Perform initialization...
54///
55///     // Signal that the service is ready
56///     ready_notifier.notify_ready();
57///
58///     // Continue with main service loop...
59/// }
60/// ```
61pub struct ServiceReadyNotifier {
62    sender: watch::Sender<bool>,
63}
64
65impl Drop for ServiceReadyNotifier {
66    /// In the event that the notifier is dropped before notifying that the
67    /// service is ready, we opt to signal ready anyway
68    fn drop(&mut self) {
69        // Ignore errors - if there are no receivers, that's fine
70        let _ = self.sender.send(true);
71    }
72}
73
74impl ServiceReadyNotifier {
75    /// Creates a new ServiceReadyNotifier from a watch sender.
76    /// You will not need to create one of these for normal usage, but being
77    /// able to is useful for testing.
78    pub fn new(sender: watch::Sender<bool>) -> Self {
79        Self { sender }
80    }
81
82    /// Notifies dependent services that this service is ready.
83    ///
84    /// Consumes the notifier to ensure ready is only signaled once.
85    pub fn notify_ready(self) {
86        // Dropping the notifier will signal that the service is ready
87        drop(self);
88    }
89}
90
91/// A receiver for watching when a service becomes ready.
92pub type ServiceReadyWatch = watch::Receiver<bool>;
93
94/// A handle to a service in the server.
95///
96/// This is returned by [`crate::server::Server::add_service()`] and provides
97/// methods to declare that other services depend on this one.
98///
99/// # Example
100///
101/// ```rust,ignore
102/// let db_handle = server.add_service(database_service);
103/// let cache_handle = server.add_service(cache_service);
104///
105/// let api_handle = server.add_service(api_service);
106/// api_handle.add_dependency(&db_handle);
107/// api_handle.add_dependency(&cache_handle);
108/// ```
109#[derive(Debug, Clone)]
110pub struct ServiceHandle {
111    pub(crate) id: NodeIndex,
112    name: String,
113    ready_watch: ServiceReadyWatch,
114    dependencies: Weak<Mutex<DependencyGraph>>,
115}
116
117/// Internal representation of a dependency relationship.
118#[derive(Debug, Clone)]
119pub(crate) struct ServiceDependency {
120    pub name: String,
121    pub ready_watch: ServiceReadyWatch,
122}
123
124impl ServiceHandle {
125    /// Creates a new ServiceHandle with the given ID, name, and readiness watcher.
126    pub(crate) fn new(
127        id: NodeIndex,
128        name: String,
129        ready_watch: ServiceReadyWatch,
130        dependencies: &Arc<Mutex<DependencyGraph>>,
131    ) -> Self {
132        Self {
133            id,
134            name,
135            ready_watch,
136            dependencies: Arc::downgrade(dependencies),
137        }
138    }
139
140    #[cfg(test)]
141    fn get_dependencies(&self) -> Vec<ServiceDependency> {
142        let Some(deps_lock) = self.dependencies.upgrade() else {
143            return Vec::new();
144        };
145
146        let deps = deps_lock.lock();
147        deps.get_dependencies(self.id)
148    }
149
150    /// Returns the name of the service.
151    pub fn name(&self) -> &str {
152        &self.name
153    }
154
155    /// Returns a clone of the readiness watcher for this service.
156    #[allow(dead_code)]
157    pub(crate) fn ready_watch(&self) -> ServiceReadyWatch {
158        self.ready_watch.clone()
159    }
160
161    /// Declares that this service depends on another service.
162    ///
163    /// This service will not start until the specified dependency has started
164    /// and signaled readiness.
165    ///
166    /// # Example
167    ///
168    /// ```rust,ignore
169    /// let db_id = server.add_service(database_service);
170    /// let api_id = server.add_service(api_service);
171    ///
172    /// // API service depends on database
173    /// api_id.add_dependency(&db_id);
174    /// ```
175    pub fn add_dependency(&self, dependency: impl Borrow<ServiceHandle>) {
176        let Some(deps_lock) = self.dependencies.upgrade() else {
177            warn!("Attempted to add a dependency after the dependency tree was dropped");
178            return;
179        };
180
181        let mut deps = deps_lock.lock();
182        if let Err(e) = deps.add_dependency(self.id, dependency.borrow().id) {
183            error!("Error creating dependency edge: {e}");
184        }
185    }
186
187    /// Declares that this service depends on the given other services.
188    ///
189    /// This service will not start until the specified dependencies have
190    /// started and signaled readiness.
191    ///
192    /// # Example
193    ///
194    /// ```rust,ignore
195    /// let db_id = server.add_service(database_service);
196    /// let cache_id = server.add_service(cache_service);
197    /// let api_id = server.add_service(api_service);
198    ///
199    /// // API service depends on database
200    /// api_id.add_dependencies(&[&db_id, &cache_id]);
201    /// ```
202    pub fn add_dependencies<'a, D>(&self, dependencies: impl IntoIterator<Item = D>)
203    where
204        D: Borrow<ServiceHandle> + 'a,
205    {
206        for dependency in dependencies {
207            self.add_dependency(dependency);
208        }
209    }
210}
211
212/// Helper for validating service dependency graphs using daggy.
213pub(crate) struct DependencyGraph {
214    /// The directed acyclic graph structure from daggy.
215    dag: Dag<ServiceDependency, ()>,
216}
217
218impl DependencyGraph {
219    /// Creates a new dependency graph.
220    pub(crate) fn new() -> Self {
221        Self { dag: Dag::new() }
222    }
223
224    /// Adds a service node to the graph.
225    ///
226    /// This should be called for all services first, before adding edges.
227    pub(crate) fn add_node(&mut self, name: String, ready_watch: ServiceReadyWatch) -> NodeIndex {
228        self.dag.add_node(ServiceDependency { name, ready_watch })
229    }
230    /// Adds a dependency edge from one service to another.
231    ///
232    /// Returns an error if adding this dependency would create a cycle or reference
233    /// a non-existent service.
234    pub(crate) fn add_dependency(
235        &mut self,
236        dependent_service_node_idx: NodeIndex,
237        dependency_service_node_idx: NodeIndex,
238    ) -> Result<(), String> {
239        // Try to add edge (from dependency to dependent)
240        // daggy will return an error if this would create a cycle
241        if let Err(cycle) =
242            self.dag
243                .add_edge(dependency_service_node_idx, dependent_service_node_idx, ())
244        {
245            return Err(format!(
246                "Circular service dependency detected between {} and {} creating cycle: {cycle}",
247                self.dag[dependency_service_node_idx].name,
248                self.dag[dependent_service_node_idx].name
249            ));
250        }
251
252        Ok(())
253    }
254
255    /// Returns services in topological order (dependencies before dependents).
256    ///
257    /// This ordering ensures that services are started in the correct order.
258    /// Returns service IDs in the correct startup order.
259    pub(crate) fn topological_sort(&self) -> Result<Vec<(NodeIndex, ServiceDependency)>, String> {
260        // Use daggy's built-in topological walker
261        let mut sorted = Vec::new();
262        let mut topo = Topo::new(&self.dag);
263
264        while let Some(service_id) = topo.next(&self.dag) {
265            sorted.push((service_id, self.dag[service_id].clone()));
266        }
267
268        Ok(sorted)
269    }
270
271    pub(crate) fn get_dependencies(&self, service_id: NodeIndex) -> Vec<ServiceDependency> {
272        self.dag
273            .parents(service_id)
274            .iter(&self.dag)
275            .map(|(_, n)| self.dag[n].clone())
276            .collect()
277    }
278}
279
280impl Default for DependencyGraph {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286#[async_trait]
287pub trait ServiceWithDependents: Send + Sync {
288    /// This function will be called when the server is ready to start the service.
289    ///
290    /// Override this method if you need to control exactly when the service signals readiness
291    /// (e.g., after async initialization is complete).
292    ///
293    /// # Arguments
294    ///
295    /// - `fds` (Unix only): a collection of listening file descriptors. During zero downtime restart
296    ///   the `fds` would contain the listening sockets passed from the old service, services should
297    ///   take the sockets they need to use then. If the sockets the service looks for don't appear in
298    ///   the collection, the service should create its own listening sockets and then put them into
299    ///   the collection in order for them to be passed to the next server.
300    /// - `shutdown`: the shutdown signal this server would receive.
301    /// - `listeners_per_fd`: number of listener tasks to spawn per file descriptor.
302    /// - `ready_notifier`: notifier to signal when the service is ready. Services with
303    ///   dependents should call `ready_notifier.notify_ready()` once they are fully initialized.
304    async fn start_service(
305        &mut self,
306        #[cfg(unix)] fds: Option<ListenFds>,
307        shutdown: ShutdownWatch,
308        listeners_per_fd: usize,
309        ready_notifier: ServiceReadyNotifier,
310    );
311
312    /// The name of the service, just for logging and naming the threads assigned to this service
313    ///
314    /// Note that due to the limit of the underlying system, only the first 16 chars will be used
315    fn name(&self) -> &str;
316
317    /// The preferred number of threads to run this service
318    ///
319    /// If `None`, the global setting will be used
320    fn threads(&self) -> Option<usize> {
321        None
322    }
323
324    /// This is currently called to inform the service about the delay it
325    /// experienced from between waiting on its dependencies. Default behavior
326    /// is to log the time.
327    ///
328    /// TODO. It would be nice if this function was called intermittently by
329    /// the server while the service was waiting to give live updates while the
330    /// service was waiting and allow the service to decide whether to keep
331    /// waiting, continue anyway, or exit
332    fn on_startup_delay(&self, time_waited: Duration) {
333        info!(
334            "Service {} spent {}ms waiting on dependencies",
335            self.name(),
336            time_waited.as_millis()
337        );
338    }
339}
340
341#[async_trait]
342impl<S> ServiceWithDependents for S
343where
344    S: Service,
345{
346    async fn start_service(
347        &mut self,
348        #[cfg(unix)] fds: Option<ListenFds>,
349        shutdown: ShutdownWatch,
350        listeners_per_fd: usize,
351        ready_notifier: ServiceReadyNotifier,
352    ) {
353        // Signal ready immediately
354        ready_notifier.notify_ready();
355
356        S::start_service(
357            self,
358            #[cfg(unix)]
359            fds,
360            shutdown,
361            listeners_per_fd,
362        )
363        .await
364    }
365
366    fn name(&self) -> &str {
367        S::name(self)
368    }
369
370    fn threads(&self) -> Option<usize> {
371        S::threads(self)
372    }
373
374    fn on_startup_delay(&self, time_waited: Duration) {
375        S::on_startup_delay(self, time_waited)
376    }
377}
378
379/// The service interface
380#[async_trait]
381pub trait Service: Sync + Send {
382    /// Start the service without readiness notification.
383    ///
384    /// This is a simpler version of [`Self::start_service()`] for services that don't need
385    /// to control when they signal readiness. The default implementation does nothing.
386    ///
387    /// Most services should override this method instead of [`Self::start_service()`].
388    ///
389    /// # Arguments
390    ///
391    /// - `fds` (Unix only): a collection of listening file descriptors.
392    /// - `shutdown`: the shutdown signal this server would receive.
393    /// - `listeners_per_fd`: number of listener tasks to spawn per file descriptor.
394    async fn start_service(
395        &mut self,
396        #[cfg(unix)] _fds: Option<ListenFds>,
397        _shutdown: ShutdownWatch,
398        _listeners_per_fd: usize,
399    ) {
400        // Default: do nothing
401    }
402
403    /// The name of the service, just for logging and naming the threads assigned to this service
404    ///
405    /// Note that due to the limit of the underlying system, only the first 16 chars will be used
406    fn name(&self) -> &str;
407
408    /// The preferred number of threads to run this service
409    ///
410    /// If `None`, the global setting will be used
411    fn threads(&self) -> Option<usize> {
412        None
413    }
414
415    /// This is currently called to inform the service about the delay it
416    /// experienced from between waiting on its dependencies. Default behavior
417    /// is to log the time.
418    ///
419    /// TODO. It would be nice if this function was called intermittently by
420    /// the server while the service was waiting to give live updates while the
421    /// service was waiting and allow the service to decide whether to keep
422    /// waiting, continue anyway, or exit
423    fn on_startup_delay(&self, time_waited: Duration) {
424        info!(
425            "Service {} spent {}ms waiting on dependencies",
426            self.name(),
427            time_waited.as_millis()
428        );
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435
436    #[test]
437    fn test_service_handle_creation() {
438        let deps: Arc<Mutex<DependencyGraph>> = Arc::new(Mutex::new(DependencyGraph::new()));
439        let (tx, rx) = watch::channel(false);
440        let service_id = ServiceHandle::new(0.into(), "test_service".to_string(), rx, &deps);
441
442        assert_eq!(service_id.id, 0.into());
443        assert_eq!(service_id.name(), "test_service");
444
445        // Should be able to clone the watch
446        let watch_clone = service_id.ready_watch();
447        assert!(!*watch_clone.borrow());
448
449        // Signaling ready should be observable through cloned watch
450        tx.send(true).ok();
451        assert!(*watch_clone.borrow());
452    }
453
454    #[test]
455    fn test_service_handle_add_dependency() {
456        let graph: Arc<Mutex<DependencyGraph>> = Arc::new(Mutex::new(DependencyGraph::new()));
457        let (tx1, rx1) = watch::channel(false);
458        let (tx1_clone, rx1_clone) = (tx1.clone(), rx1.clone());
459        let (_tx2, rx2) = watch::channel(false);
460        let (_tx2_clone, rx2_clone) = (_tx2.clone(), rx2.clone());
461
462        // Add nodes to the graph first
463        let dep_node = {
464            let mut g = graph.lock();
465            g.add_node("dependency".to_string(), rx1)
466        };
467        let main_node = {
468            let mut g = graph.lock();
469            g.add_node("main".to_string(), rx2)
470        };
471
472        let dep_service = ServiceHandle::new(dep_node, "dependency".to_string(), rx1_clone, &graph);
473        let main_service = ServiceHandle::new(main_node, "main".to_string(), rx2_clone, &graph);
474
475        // Add dependency
476        main_service.add_dependency(&dep_service);
477
478        // Get dependencies and verify
479        let deps = main_service.get_dependencies();
480        assert_eq!(deps.len(), 1);
481        assert_eq!(deps[0].name, "dependency");
482
483        // Verify watch is working
484        assert!(!*deps[0].ready_watch.borrow());
485        tx1_clone.send(true).ok();
486        assert!(*deps[0].ready_watch.borrow());
487    }
488
489    #[test]
490    fn test_service_handle_multiple_dependencies() {
491        let graph: Arc<Mutex<DependencyGraph>> = Arc::new(Mutex::new(DependencyGraph::new()));
492        let (_tx1, rx1) = watch::channel(false);
493        let rx1_clone = rx1.clone();
494        let (_tx2, rx2) = watch::channel(false);
495        let rx2_clone = rx2.clone();
496        let (_tx3, rx3) = watch::channel(false);
497        let rx3_clone = rx3.clone();
498
499        // Add nodes to the graph first
500        let dep1_node = {
501            let mut g = graph.lock();
502            g.add_node("dep1".to_string(), rx1)
503        };
504        let dep2_node = {
505            let mut g = graph.lock();
506            g.add_node("dep2".to_string(), rx2)
507        };
508        let main_node = {
509            let mut g = graph.lock();
510            g.add_node("main".to_string(), rx3)
511        };
512
513        let dep1 = ServiceHandle::new(dep1_node, "dep1".to_string(), rx1_clone, &graph);
514        let dep2 = ServiceHandle::new(dep2_node, "dep2".to_string(), rx2_clone, &graph);
515        let main_service = ServiceHandle::new(main_node, "main".to_string(), rx3_clone, &graph);
516
517        // Add multiple dependencies
518        main_service.add_dependency(&dep1);
519        main_service.add_dependency(&dep2);
520
521        // Get dependencies and verify
522        let deps = main_service.get_dependencies();
523        assert_eq!(deps.len(), 2);
524
525        let dep_names: Vec<&str> = deps.iter().map(|d| d.name.as_str()).collect();
526        assert!(dep_names.contains(&"dep1"));
527        assert!(dep_names.contains(&"dep2"));
528    }
529
530    #[test]
531    fn test_single_service_no_dependencies() {
532        let mut graph = DependencyGraph::new();
533        let (_tx, rx) = watch::channel(false);
534        let _node = graph.add_node("service1".to_string(), rx);
535
536        let order = graph.topological_sort().unwrap();
537        assert_eq!(order.len(), 1);
538        assert_eq!(order[0].1.name, "service1");
539    }
540
541    #[test]
542    fn test_simple_dependency_chain() {
543        let mut graph = DependencyGraph::new();
544        let (_tx1, rx1) = watch::channel(false);
545        let (_tx2, rx2) = watch::channel(false);
546        let (_tx3, rx3) = watch::channel(false);
547
548        let node1 = graph.add_node("service1".to_string(), rx1);
549        let node2 = graph.add_node("service2".to_string(), rx2);
550        let node3 = graph.add_node("service3".to_string(), rx3);
551
552        // service2 depends on service1, service3 depends on service2
553        graph.add_dependency(node2, node1).unwrap();
554        graph.add_dependency(node3, node2).unwrap();
555
556        let order = graph.topological_sort().unwrap();
557        assert_eq!(order.len(), 3);
558        // Verify order: service1, service2, service3
559        assert_eq!(order[0].1.name, "service1");
560        assert_eq!(order[1].1.name, "service2");
561        assert_eq!(order[2].1.name, "service3");
562    }
563
564    #[test]
565    fn test_diamond_dependency() {
566        let mut graph = DependencyGraph::new();
567        let (_tx1, rx1) = watch::channel(false);
568        let (_tx2, rx2) = watch::channel(false);
569        let (_tx3, rx3) = watch::channel(false);
570
571        let db = graph.add_node("db".to_string(), rx1);
572        let cache = graph.add_node("cache".to_string(), rx2);
573        let api = graph.add_node("api".to_string(), rx3);
574
575        // api depends on both db and cache
576        graph.add_dependency(api, db).unwrap();
577        graph.add_dependency(api, cache).unwrap();
578
579        let order = graph.topological_sort().unwrap();
580        // api should come last, but db and cache order doesn't matter
581        assert_eq!(order.len(), 3);
582        assert_eq!(order[2].1.name, "api");
583        let first_two: Vec<&str> = order[0..2].iter().map(|(_, d)| d.name.as_str()).collect();
584        assert!(first_two.contains(&"db"));
585        assert!(first_two.contains(&"cache"));
586    }
587
588    #[test]
589    #[should_panic(expected = "node indices out of bounds")]
590    fn test_missing_dependency() {
591        let mut graph = DependencyGraph::new();
592        let (_tx1, rx1) = watch::channel(false);
593
594        let node1 = graph.add_node("service1".to_string(), rx1);
595        let nonexistent = NodeIndex::new(999);
596
597        // Try to add dependency on non-existent node - this should panic
598        let _ = graph.add_dependency(node1, nonexistent);
599    }
600
601    #[test]
602    fn test_circular_dependency_self() {
603        let mut graph = DependencyGraph::new();
604        let (_tx1, rx1) = watch::channel(false);
605
606        let node1 = graph.add_node("service1".to_string(), rx1);
607
608        // Try to make service depend on itself
609        let result = graph.add_dependency(node1, node1);
610
611        assert!(result.is_err());
612        assert!(result.unwrap_err().contains("Circular"));
613    }
614
615    #[test]
616    fn test_circular_dependency_two_services() {
617        let mut graph = DependencyGraph::new();
618        let (_tx1, rx1) = watch::channel(false);
619        let (_tx2, rx2) = watch::channel(false);
620
621        // Add both nodes first
622        let node1 = graph.add_node("service1".to_string(), rx1);
623        let node2 = graph.add_node("service2".to_string(), rx2);
624
625        // Try to add circular dependencies
626        graph.add_dependency(node1, node2).unwrap();
627        let result = graph.add_dependency(node2, node1);
628
629        assert!(result.is_err());
630        assert!(result.unwrap_err().contains("Circular"));
631    }
632
633    #[test]
634    fn test_circular_dependency_three_services() {
635        let mut graph = DependencyGraph::new();
636        let (_tx1, rx1) = watch::channel(false);
637        let (_tx2, rx2) = watch::channel(false);
638        let (_tx3, rx3) = watch::channel(false);
639
640        // Add all nodes first
641        let node1 = graph.add_node("service1".to_string(), rx1);
642        let node2 = graph.add_node("service2".to_string(), rx2);
643        let node3 = graph.add_node("service3".to_string(), rx3);
644
645        // Add dependencies that would form a cycle
646        graph.add_dependency(node1, node2).unwrap();
647        graph.add_dependency(node2, node3).unwrap();
648        let result = graph.add_dependency(node3, node1);
649
650        assert!(result.is_err());
651        assert!(result.unwrap_err().contains("Circular"));
652    }
653
654    #[test]
655    fn test_complex_valid_graph() {
656        let mut graph = DependencyGraph::new();
657        let (_tx1, rx1) = watch::channel(false);
658        let (_tx2, rx2) = watch::channel(false);
659        let (_tx3, rx3) = watch::channel(false);
660        let (_tx4, rx4) = watch::channel(false);
661        let (_tx5, rx5) = watch::channel(false);
662
663        // Build a complex dependency graph:
664        //   db, cache - no deps
665        //   auth -> db
666        //   api -> db, cache, auth
667        //   frontend -> api
668        let db = graph.add_node("db".to_string(), rx1);
669        let cache = graph.add_node("cache".to_string(), rx2);
670        let auth = graph.add_node("auth".to_string(), rx3);
671        let api = graph.add_node("api".to_string(), rx4);
672        let frontend = graph.add_node("frontend".to_string(), rx5);
673
674        graph.add_dependency(auth, db).unwrap();
675        graph.add_dependency(api, db).unwrap();
676        graph.add_dependency(api, cache).unwrap();
677        graph.add_dependency(api, auth).unwrap();
678        graph.add_dependency(frontend, api).unwrap();
679
680        let order = graph.topological_sort().unwrap();
681
682        // Verify ordering constraints using names
683        let db_pos = order.iter().position(|(_, d)| d.name == "db").unwrap();
684        let cache_pos = order.iter().position(|(_, d)| d.name == "cache").unwrap();
685        let auth_pos = order.iter().position(|(_, d)| d.name == "auth").unwrap();
686        let api_pos = order.iter().position(|(_, d)| d.name == "api").unwrap();
687        let frontend_pos = order
688            .iter()
689            .position(|(_, d)| d.name == "frontend")
690            .unwrap();
691
692        assert!(db_pos < auth_pos);
693        assert!(auth_pos < api_pos);
694        assert!(db_pos < api_pos);
695        assert!(cache_pos < api_pos);
696        assert!(api_pos < frontend_pos);
697    }
698}