1use std::future::Future;
4use std::sync::Arc;
5use std::time::Duration;
6
7use bon::Builder;
8use planetary_db::Database;
9use planetary_server::DEFAULT_ADDRESS;
10use planetary_server::DEFAULT_PORT;
11use reqwest::Client;
12use secrecy::SecretString;
13use tes::v1::types::responses::ServiceInfo;
14use tokio_retry2::strategy::ExponentialFactorBackoff;
15use tokio_retry2::strategy::MaxInterval;
16use tracing::warn;
17use url::Url;
18
19mod info;
20mod tasks;
21
22fn retry_durations() -> impl Iterator<Item = Duration> {
27 const INITIAL_DELAY_MILLIS: u64 = 1000;
28 const BASE_FACTOR: f64 = 2.0;
29 const MAX_DURATION: Duration = Duration::from_secs(60);
30 const RETRIES: usize = 5;
31
32 ExponentialFactorBackoff::from_millis(INITIAL_DELAY_MILLIS, BASE_FACTOR)
33 .max_duration(MAX_DURATION)
34 .take(RETRIES)
35}
36
37fn notify_retry(e: &reqwest::Error, duration: Duration) {
39 warn!(
40 "network operation failed: {e} (retrying after {duration} seconds)",
41 duration = duration.as_secs()
42 );
43}
44
45struct OrchestratorServiceInfo {
47 url: Url,
49 api_key: SecretString,
51}
52
53#[derive(Clone)]
55struct State {
56 client: Arc<Client>,
58
59 info: Arc<ServiceInfo>,
61
62 database: Arc<dyn Database>,
64
65 orchestrator: Arc<OrchestratorServiceInfo>,
67}
68
69impl State {
70 pub fn new(
73 info: ServiceInfo,
74 database: Arc<dyn Database>,
75 orchestrator_url: Url,
76 orchestrator_api_key: SecretString,
77 ) -> Self {
78 Self {
79 client: Arc::new(Client::new()),
80 info: Arc::new(info),
81 database: database.clone(),
82 orchestrator: Arc::new(OrchestratorServiceInfo {
83 url: orchestrator_url,
84 api_key: orchestrator_api_key,
85 }),
86 }
87 }
88}
89
90#[derive(Clone, Builder)]
92pub struct Server {
93 #[builder(into, default = DEFAULT_ADDRESS)]
95 address: String,
96
97 #[builder(into, default = DEFAULT_PORT)]
99 port: u16,
100
101 #[builder(into)]
103 info: ServiceInfo,
104
105 #[builder(name = "shared_database")]
107 database: Arc<dyn Database>,
108
109 #[builder(into)]
111 orchestrator_url: Url,
112
113 #[builder(into)]
115 orchestrator_api_key: SecretString,
116}
117
118impl<S: server_builder::State> ServerBuilder<S> {
119 pub fn database(
124 self,
125 database: impl Database + 'static,
126 ) -> ServerBuilder<server_builder::SetSharedDatabase<S>>
127 where
128 S::SharedDatabase: server_builder::IsUnset,
129 {
130 self.shared_database(Arc::new(database))
131 }
132}
133
134impl Server {
135 pub async fn run<F>(self, shutdown: F) -> anyhow::Result<()>
137 where
138 F: Future<Output = ()> + Send + 'static,
139 {
140 let server = planetary_server::Server::builder()
141 .address(self.address)
142 .port(self.port)
143 .routers(bon::vec![info::router(), tasks::router()])
144 .build();
145
146 let state = State::new(
147 self.info,
148 self.database,
149 self.orchestrator_url,
150 self.orchestrator_api_key,
151 );
152 server.run(state, shutdown).await?;
153 Ok(())
154 }
155}