1use crate::api::DevicePath;
2
3mod case_insensitive_str;
4
5mod discovery;
6pub use discovery::{BoundServer as BoundDiscoveryServer, Server as DiscoveryServer};
7
8mod error;
9pub(crate) use error::{Error, Result};
10
11mod params;
12pub(crate) use params::ActionParams;
13
14mod response;
15
16#[macro_use]
17mod setup_page;
18
19mod transaction;
20pub(crate) use transaction::*;
21
22#[cfg(feature = "test")]
23pub(crate) mod test;
24
25use crate::Devices;
26#[cfg(feature = "camera")]
27use crate::api::Camera;
28use crate::api::{CargoServerInfo, DeviceType, ServerInfo};
29use crate::discovery::DEFAULT_DISCOVERY_PORT;
30use crate::response::ValueResponse;
31use axum::extract::{FromRequest, Path, Request};
32use axum::response::{Html, IntoResponse, Response};
33use axum::{Router, routing};
34use fnv::FnvHashSet;
35use futures::future::{BoxFuture, FutureExt};
36use http::StatusCode;
37use net_literals::addr;
38use serde::Deserialize;
39use socket2::{Domain, Protocol, Socket, Type};
40use std::collections::BTreeMap;
41use std::net::SocketAddr;
42use std::sync::{Arc, RwLock};
43use tokio::net::TcpListener;
44use tracing::Instrument;
45
46#[derive(Debug)]
48pub struct Server {
49 pub devices: Devices,
51 pub info: ServerInfo,
53 pub listen_addr: SocketAddr,
57 pub discovery_port: u16,
61}
62
63impl Server {
64 pub const fn new(info: ServerInfo) -> Self {
75 Self {
76 devices: Devices::default(),
77 info,
78 listen_addr: addr!("[::]:0"),
79 discovery_port: DEFAULT_DISCOVERY_PORT,
80 }
81 }
82}
83
84struct ServerHandler {
85 path: String,
86 params: ActionParams,
87}
88
89impl<S: Send + Sync> FromRequest<S> for ServerHandler {
90 type Rejection = Response;
91
92 async fn from_request(req: Request, state: &S) -> std::result::Result<Self, Self::Rejection> {
93 let path = req.uri().path().to_owned();
94 let params = ActionParams::from_request(req, state).await?;
95 Ok(Self { path, params })
96 }
97}
98
99impl ServerHandler {
100 async fn exec<Output>(
101 mut self,
102 make_response: impl AsyncFnOnce(ActionParams) -> Output,
103 ) -> axum::response::Result<Response>
104 where
105 ResponseWithTransaction<Output>: IntoResponse,
106 {
107 let request_transaction = RequestTransaction::extract(&mut self.params)?;
108 let response_transaction =
109 ResponseTransaction::new(request_transaction.client_transaction_id);
110
111 let span = tracing::error_span!(
112 "handle_alpaca_request",
113 path = self.path,
114 client_id = request_transaction.client_id,
115 client_transaction_id = request_transaction.client_transaction_id,
116 server_transaction_id = response_transaction.server_transaction_id,
117 );
118
119 Ok(async move {
120 tracing::debug!(params = ?self.params, "Received request");
121
122 ResponseWithTransaction {
123 transaction: response_transaction,
124 response: make_response(self.params).await,
125 }
126 }
127 .instrument(span)
128 .await
129 .into_response())
130 }
131}
132
133#[derive(derive_more::Debug)]
135pub struct BoundServer {
136 #[debug(skip)]
138 axum: BoxFuture<'static, eyre::Result<std::convert::Infallible>>,
139 axum_listen_addr: SocketAddr,
140 discovery: BoundDiscoveryServer,
141}
142
143impl BoundServer {
144 #[expect(clippy::missing_const_for_fn)] pub fn listen_addr(&self) -> SocketAddr {
147 self.axum_listen_addr
148 }
149
150 pub fn discovery_listen_addr(&self) -> SocketAddr {
152 self.discovery.listen_addr()
153 }
154
155 pub async fn start(self) -> eyre::Result<std::convert::Infallible> {
160 match tokio::select! {
161 axum = self.axum => axum?,
162 discovery = self.discovery.start() => discovery,
163 } {}
164 }
165}
166
167#[derive(Deserialize)]
168struct ApiPath {
169 #[serde(with = "DevicePath")]
170 device_type: DeviceType,
171 device_number: usize,
172 action: String,
173}
174
175impl Server {
176 pub async fn bind(self) -> eyre::Result<BoundServer> {
178 let addr = self.listen_addr;
179
180 tracing::debug!(%addr, "Binding Alpaca server");
181
182 let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
189
190 if addr.is_ipv6() {
191 socket.set_only_v6(false)?;
192 }
193
194 socket.set_nonblocking(true)?;
195 socket.bind(&addr.into())?;
196 socket.listen(128)?;
197
198 let listener = TcpListener::from_std(socket.into())?;
199
200 let bound_addr = listener.local_addr()?;
202
203 tracing::info!(%bound_addr, "Bound Alpaca server");
204
205 let discovery_server = DiscoveryServer::for_alpaca_server_at(bound_addr)
208 .bind()
209 .await?;
210
211 tracing::debug!("Bound Alpaca discovery server");
212
213 Ok(BoundServer {
214 axum: async move {
215 axum::serve(
216 listener,
217 self.into_router()
218 .into_make_service(),
220 )
221 .await?;
222 unreachable!("Alpaca server should never stop without an error")
223 }
224 .instrument(tracing::error_span!("alpaca_server_loop"))
225 .boxed(),
226 axum_listen_addr: bound_addr,
227 discovery: discovery_server,
228 })
229 }
230
231 pub async fn start(self) -> eyre::Result<std::convert::Infallible> {
235 self.bind().await?.start().await
236 }
237
238 #[expect(clippy::too_many_lines)]
239 fn into_router(self) -> Router {
240 let devices = Arc::new(self.devices);
241 let server_info = Arc::new(self.info);
242 let connecting_devices = Arc::new(RwLock::new(FnvHashSet::default()));
243
244 Router::new()
245 .route(
246 "/management/apiversions",
247 routing::get(|server_handler: ServerHandler| {
248 server_handler.exec(async move |_params| ValueResponse { value: [1_u32] })
249 }),
250 )
251 .route("/management/v1/configureddevices", {
252 let this = Arc::clone(&devices);
253
254 routing::get(|server_handler: ServerHandler| {
255 server_handler.exec(async move |_params| ValueResponse {
256 value: this
257 .iter_all()
258 .map(|(device, number)| device.to_configured_device(number))
259 .collect::<Vec<_>>(),
260 })
261 })
262 })
263 .route("/management/v1/description", {
264 let server_info = Arc::clone(&server_info);
265
266 routing::get(move |server_handler: ServerHandler| {
267 server_handler.exec(async move |_params| ValueResponse {
268 value: Arc::clone(&server_info),
269 })
270 })
271 })
272 .route("/setup", {
273 let this = Arc::clone(&devices);
274 let server_info = Arc::clone(&server_info);
275
276 routing::get(async move || {
277 let mut setup_page = setup_page::SetupPage {
278 server_info: &server_info,
279 grouped_devices: BTreeMap::new(),
280 };
281
282 for (device, number) in this.iter_all() {
283 let device = device.to_configured_device(number);
284
285 setup_page
286 .grouped_devices
287 .entry(device.ty)
288 .or_default()
289 .push((number, device.name));
290 }
291
292 Html(setup_page.to_string())
293 })
294 })
295 .route(
296 "/api/v1/{device_type}/{device_number}/{action}",
297 routing::any(
298 async move |Path(ApiPath {
299 device_type,
300 device_number,
301 action,
302 }),
303 #[cfg(feature = "camera")] headers: http::HeaderMap,
304 server_handler: ServerHandler| {
305 #[cfg(feature = "camera")]
306 let mut action = action;
307
308 #[cfg(feature = "camera")]
309 if device_type == DeviceType::Camera {
310 use crate::api::camera::{ImageArray, ImageBytesResponse};
311
312 if action == "imagearrayvariant" {
315 action.truncate("imagearray".len());
316 }
317
318 if matches!(server_handler.params, ActionParams::Get { .. })
319 && action == "imagearray"
320 && ImageArray::is_accepted(&headers)
321 {
322 return server_handler
323 .exec(async move |_params| {
324 Ok::<_, Error>(ImageBytesResponse(
325 devices
326 .get_for_server::<dyn Camera>(device_number)?
327 .image_array()
328 .await?,
329 ))
330 })
331 .await;
332 }
333 }
334
335 if action == "setup" {
337 return match devices
338 .get_device_for_server(device_type, device_number)?
339 .setup()
340 .await
341 {
342 Ok(html) => Ok(Html(html).into_response()),
343 Err(err) => {
344 Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{err:#}"))
345 .into())
346 }
347 };
348 }
349
350 if action == "connect" || action == "disconnect" {
354 return server_handler.exec(async move |_params| {
355 let device = devices.get_device_for_server(device_type, device_number)?;
356 if let Ok(mut connecting_devices) = connecting_devices.write() {
357 _ = connecting_devices.insert(Arc::clone(&device));
358 }
359 _ = tokio::spawn(async move {
360 if let Err(err) = device.set_connected(action == "connect").await {
361 tracing::error!(%err, "Error changing device connection state");
362 }
363 if let Ok(mut connecting_devices) = connecting_devices.write() {
364 _ = connecting_devices.remove(&device);
365 }
366 }.in_current_span());
367 Result::Ok(())
368 }).await;
369 }
370 if action == "connecting" {
371 return server_handler.exec(async move |_params| {
372 let device = devices.get_device_for_server(device_type, device_number)?;
373 Result::Ok(connecting_devices.read().is_ok_and(|connecting_devices| connecting_devices.contains(&device)))
374 }).await;
375 }
376
377 server_handler.exec(|params| devices.handle_action(device_type, device_number, &action, params)).await
378 },
379 ),
380 )
381 }
382}