planetary_api/
lib.rs

1//! The implementation of the Planetary TES API server.
2
3use std::future::Future;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bon::Builder;
8use planetary_db::Database;
9use planetary_server::DEFAULT_ADDRESS;
10use planetary_server::DEFAULT_PORT;
11use reqwest::Client;
12use secrecy::SecretString;
13use tes::v1::types::responses::ServiceInfo;
14use tokio_retry2::strategy::ExponentialFactorBackoff;
15use tokio_retry2::strategy::MaxInterval;
16use tracing::warn;
17use url::Url;
18
19mod info;
20mod tasks;
21
22/// Gets an iterator over the retry durations for network operations.
23///
24/// Retries use an exponential power of 2 backoff, starting at 1 second with
25/// a maximum duration of 60 seconds.
26fn retry_durations() -> impl Iterator<Item = Duration> {
27    const INITIAL_DELAY_MILLIS: u64 = 1000;
28    const BASE_FACTOR: f64 = 2.0;
29    const MAX_DURATION: Duration = Duration::from_secs(60);
30    const RETRIES: usize = 5;
31
32    ExponentialFactorBackoff::from_millis(INITIAL_DELAY_MILLIS, BASE_FACTOR)
33        .max_duration(MAX_DURATION)
34        .take(RETRIES)
35}
36
37/// Helper for notifying that a network operation failed and will be retried.
38fn notify_retry(e: &reqwest::Error, duration: Duration) {
39    warn!(
40        "network operation failed: {e} (retrying after {duration} seconds)",
41        duration = duration.as_secs()
42    );
43}
44
45/// Represents information about the orchestrator service.
46struct OrchestratorServiceInfo {
47    /// The URL of the orchestrator service.
48    url: Url,
49    /// The orchestrator service API key.
50    api_key: SecretString,
51}
52
53/// The state for the server.
54#[derive(Clone)]
55struct State {
56    /// The HTTP client for communicating with the orchestration service.
57    client: Arc<Client>,
58
59    /// The service information.
60    info: Arc<ServiceInfo>,
61
62    /// The TES database.
63    database: Arc<dyn Database>,
64
65    /// The orchestrator service information.
66    orchestrator: Arc<OrchestratorServiceInfo>,
67}
68
69impl State {
70    /// Constructs a new state given the service info, database, and
71    /// orchestrator service sender.
72    pub fn new(
73        info: ServiceInfo,
74        database: Arc<dyn Database>,
75        orchestrator_url: Url,
76        orchestrator_api_key: SecretString,
77    ) -> Self {
78        Self {
79            client: Arc::new(Client::new()),
80            info: Arc::new(info),
81            database: database.clone(),
82            orchestrator: Arc::new(OrchestratorServiceInfo {
83                url: orchestrator_url,
84                api_key: orchestrator_api_key,
85            }),
86        }
87    }
88}
89
90/// The TES API server.
91#[derive(Clone, Builder)]
92pub struct Server {
93    /// The address to bind the server to.
94    #[builder(into, default = DEFAULT_ADDRESS)]
95    address: String,
96
97    /// The port to bind the server to.
98    #[builder(into, default = DEFAULT_PORT)]
99    port: u16,
100
101    /// The service information.
102    #[builder(into)]
103    info: ServiceInfo,
104
105    /// The TES database to use for the server.
106    #[builder(name = "shared_database")]
107    database: Arc<dyn Database>,
108
109    /// The Planetary orchestrator service URL.
110    #[builder(into)]
111    orchestrator_url: Url,
112
113    /// The Planetary orchestrator service API key.
114    #[builder(into)]
115    orchestrator_api_key: SecretString,
116}
117
118impl<S: server_builder::State> ServerBuilder<S> {
119    /// The TES database to use for the server.
120    ///
121    /// This is a convenience method for setting the shared database server
122    /// from any type that implements `Database`.
123    pub fn database(
124        self,
125        database: impl Database + 'static,
126    ) -> ServerBuilder<server_builder::SetSharedDatabase<S>>
127    where
128        S::SharedDatabase: server_builder::IsUnset,
129    {
130        self.shared_database(Arc::new(database))
131    }
132}
133
134impl Server {
135    /// Runs the server.
136    pub async fn run<F>(self, shutdown: F) -> anyhow::Result<()>
137    where
138        F: Future<Output = ()> + Send + 'static,
139    {
140        let server = planetary_server::Server::builder()
141            .address(self.address)
142            .port(self.port)
143            .routers(bon::vec![info::router(), tasks::router()])
144            .build();
145
146        let state = State::new(
147            self.info,
148            self.database,
149            self.orchestrator_url,
150            self.orchestrator_api_key,
151        );
152        server.run(state, shutdown).await?;
153        Ok(())
154    }
155}