planetary_monitor/
lib.rs

1//! Implements the Planetary task monitor.
2
3use std::future::Future;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bon::Builder;
8use planetary_db::Database;
9use planetary_server::DEFAULT_ADDRESS;
10use planetary_server::DEFAULT_PORT;
11use secrecy::SecretString;
12use url::Url;
13
14use crate::monitor::Monitor;
15use crate::monitor::OrchestratorServiceInfo;
16
17mod monitor;
18
19/// The task monitor server.
20#[derive(Clone, Builder)]
21pub struct Server {
22    /// The address to bind the server to.
23    #[builder(into, default = DEFAULT_ADDRESS)]
24    address: String,
25
26    /// The port to bind the server to.
27    #[builder(into, default = DEFAULT_PORT)]
28    port: u16,
29
30    /// The TES database to use for the server.
31    #[builder(name = "shared_database")]
32    database: Arc<dyn Database>,
33
34    /// The Kubernetes namespace for the Planetary services.
35    ///
36    /// Defaults to `planetary`.
37    #[builder(into)]
38    planetary_namespace: Option<String>,
39
40    /// The Kubernetes namespace to use for TES task resources.
41    ///
42    /// Defaults to `planetary-tasks`.
43    #[builder(into)]
44    tasks_namespace: Option<String>,
45
46    /// The interval for which the monitor should check the cluster state.
47    ///
48    /// Defaults to 60 seconds.
49    #[builder(into)]
50    interval: Duration,
51
52    /// The Planetary orchestrator service URL.
53    #[builder(into)]
54    orchestrator_url: Url,
55
56    /// The Planetary orchestrator service API key.
57    #[builder(into)]
58    orchestrator_api_key: SecretString,
59}
60
61impl<S: server_builder::State> ServerBuilder<S> {
62    /// The TES database to use for the server.
63    ///
64    /// This is a convenience method for setting the shared database server
65    /// from any type that implements `Database`.
66    pub fn database(
67        self,
68        database: impl Database + 'static,
69    ) -> ServerBuilder<server_builder::SetSharedDatabase<S>>
70    where
71        S::SharedDatabase: server_builder::IsUnset,
72    {
73        self.shared_database(Arc::new(database))
74    }
75}
76
77impl Server {
78    /// Runs the server.
79    pub async fn run<F>(self, shutdown: F) -> anyhow::Result<()>
80    where
81        F: Future<Output = ()> + Send + 'static,
82    {
83        let server = planetary_server::Server::builder()
84            .address(self.address)
85            .port(self.port)
86            .build();
87
88        // Spawn the monitor
89        let monitor = Monitor::spawn(
90            self.database,
91            OrchestratorServiceInfo {
92                url: self.orchestrator_url,
93                api_key: self.orchestrator_api_key,
94            },
95            self.planetary_namespace,
96            self.tasks_namespace,
97            self.interval,
98        )
99        .await?;
100
101        // Run the server to completion
102        server.run((), shutdown).await?;
103
104        // Finally, shutdown the monitor
105        monitor.shutdown().await;
106        Ok(())
107    }
108}