ascom_alpaca/server/
mod.rs

1use crate::api::DevicePath;
2
3mod case_insensitive_str;
4
5mod discovery;
6pub use discovery::{BoundServer as BoundDiscoveryServer, Server as DiscoveryServer};
7
8mod error;
9pub(crate) use error::{Error, Result};
10
11mod params;
12pub(crate) use params::ActionParams;
13
14mod response;
15
16#[macro_use]
17mod setup_page;
18
19mod transaction;
20pub(crate) use transaction::*;
21
22#[cfg(feature = "test")]
23pub(crate) mod test;
24
25use crate::Devices;
26#[cfg(feature = "camera")]
27use crate::api::Camera;
28use crate::api::{CargoServerInfo, DeviceType, ServerInfo};
29use crate::discovery::DEFAULT_DISCOVERY_PORT;
30use crate::response::ValueResponse;
31use axum::extract::{FromRequest, Path, Request};
32use axum::response::{Html, IntoResponse, Response};
33use axum::{Router, routing};
34use fnv::FnvHashSet;
35use futures::future::{BoxFuture, FutureExt};
36use http::StatusCode;
37use net_literals::addr;
38use serde::Deserialize;
39use socket2::{Domain, Protocol, Socket, Type};
40use std::collections::BTreeMap;
41use std::net::SocketAddr;
42use std::sync::{Arc, RwLock};
43use tokio::net::TcpListener;
44use tracing::Instrument;
45
46/// The Alpaca server.
47#[derive(Debug)]
48pub struct Server {
49    /// Registered devices.
50    pub devices: Devices,
51    /// General server information.
52    pub info: ServerInfo,
53    /// Address for the server to listen on.
54    ///
55    /// Defaults to listening on an arbitrary port on all interfaces.
56    pub listen_addr: SocketAddr,
57    /// Port for the discovery server to listen on.
58    ///
59    /// Defaults to 32227.
60    pub discovery_port: u16,
61}
62
63impl Server {
64    /// Create a server with default configuration and the provided server information.
65    ///
66    /// Server information can be automatically populated from `Cargo.toml` using the [`CargoServerInfo!`] macro:
67    ///
68    /// ```
69    /// # use ascom_alpaca::Server;
70    /// use ascom_alpaca::api::CargoServerInfo;
71    ///
72    /// let server = Server::new(CargoServerInfo!());
73    /// ```
74    pub const fn new(info: ServerInfo) -> Self {
75        Self {
76            devices: Devices::default(),
77            info,
78            listen_addr: addr!("[::]:0"),
79            discovery_port: DEFAULT_DISCOVERY_PORT,
80        }
81    }
82}
83
84struct ServerHandler {
85    path: String,
86    params: ActionParams,
87}
88
89impl<S: Send + Sync> FromRequest<S> for ServerHandler {
90    type Rejection = Response;
91
92    async fn from_request(req: Request, state: &S) -> std::result::Result<Self, Self::Rejection> {
93        let path = req.uri().path().to_owned();
94        let params = ActionParams::from_request(req, state).await?;
95        Ok(Self { path, params })
96    }
97}
98
99impl ServerHandler {
100    async fn exec<Output>(
101        mut self,
102        make_response: impl AsyncFnOnce(ActionParams) -> Output,
103    ) -> axum::response::Result<Response>
104    where
105        ResponseWithTransaction<Output>: IntoResponse,
106    {
107        let request_transaction = RequestTransaction::extract(&mut self.params)?;
108        let response_transaction =
109            ResponseTransaction::new(request_transaction.client_transaction_id);
110
111        let span = tracing::error_span!(
112            "handle_alpaca_request",
113            path = self.path,
114            client_id = request_transaction.client_id,
115            client_transaction_id = request_transaction.client_transaction_id,
116            server_transaction_id = response_transaction.server_transaction_id,
117        );
118
119        Ok(async move {
120            tracing::debug!(params = ?self.params, "Received request");
121
122            ResponseWithTransaction {
123                transaction: response_transaction,
124                response: make_response(self.params).await,
125            }
126        }
127        .instrument(span)
128        .await
129        .into_response())
130    }
131}
132
133/// Alpaca servers bound to their respective ports and ready to listen.
134#[derive(derive_more::Debug)]
135pub struct BoundServer {
136    // Axum types are a bit complicated, so just Box it for now.
137    #[debug(skip)]
138    axum: BoxFuture<'static, eyre::Result<std::convert::Infallible>>,
139    axum_listen_addr: SocketAddr,
140    discovery: BoundDiscoveryServer,
141}
142
143impl BoundServer {
144    /// Returns the address the main Alpaca server is listening on.
145    #[expect(clippy::missing_const_for_fn)] // we don't want to guarantee this will be always const
146    pub fn listen_addr(&self) -> SocketAddr {
147        self.axum_listen_addr
148    }
149
150    /// Returns the address the discovery server is listening on.
151    pub fn discovery_listen_addr(&self) -> SocketAddr {
152        self.discovery.listen_addr()
153    }
154
155    /// Starts the Alpaca and discovery servers.
156    ///
157    /// Note: this function starts an infinite async loop and it's your responsibility to spawn it off
158    /// via [`tokio::spawn`] if necessary.
159    pub async fn start(self) -> eyre::Result<std::convert::Infallible> {
160        match tokio::select! {
161            axum = self.axum => axum?,
162            discovery = self.discovery.start() => discovery,
163        } {}
164    }
165}
166
167#[derive(Deserialize)]
168struct ApiPath {
169    #[serde(with = "DevicePath")]
170    device_type: DeviceType,
171    device_number: usize,
172    action: String,
173}
174
175impl Server {
176    /// Binds the Alpaca and discovery servers to local ports.
177    pub async fn bind(self) -> eyre::Result<BoundServer> {
178        let addr = self.listen_addr;
179
180        tracing::debug!(%addr, "Binding Alpaca server");
181
182        // Like in discovery, use dual stack (IPv4+IPv6) consistently on all platforms.
183        //
184        // This is usually what user wants when setting IPv6 address like `[::]`
185        // and this is what happens by default on popular Linux distros but not on Windows.
186        //
187        // For that, we can't use the standard `TcpListener::bind` and need to build our own socket.
188        let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
189
190        if addr.is_ipv6() {
191            socket.set_only_v6(false)?;
192        }
193
194        socket.set_nonblocking(true)?;
195        socket.bind(&addr.into())?;
196        socket.listen(128)?;
197
198        let listener = TcpListener::from_std(socket.into())?;
199
200        // The address can differ e.g. when using port 0 (auto-assigned).
201        let bound_addr = listener.local_addr()?;
202
203        tracing::info!(%bound_addr, "Bound Alpaca server");
204
205        // Bind discovery server only once the Alpaca server is bound successfully.
206        // We need to know the bound address & the port to advertise.
207        let discovery_server = DiscoveryServer::for_alpaca_server_at(bound_addr)
208            .bind()
209            .await?;
210
211        tracing::debug!("Bound Alpaca discovery server");
212
213        Ok(BoundServer {
214            axum: async move {
215                axum::serve(
216                    listener,
217                    self.into_router()
218                        // .layer(TraceLayer::new_for_http())
219                        .into_make_service(),
220                )
221                .await?;
222                unreachable!("Alpaca server should never stop without an error")
223            }
224            .instrument(tracing::error_span!("alpaca_server_loop"))
225            .boxed(),
226            axum_listen_addr: bound_addr,
227            discovery: discovery_server,
228        })
229    }
230
231    /// Binds the Alpaca and discovery servers to local ports and starts them.
232    ///
233    /// This is a convenience method that is equivalent to calling [`Self::bind`] and [`BoundServer::start`].
234    pub async fn start(self) -> eyre::Result<std::convert::Infallible> {
235        self.bind().await?.start().await
236    }
237
238    #[expect(clippy::too_many_lines)]
239    fn into_router(self) -> Router {
240        let devices = Arc::new(self.devices);
241        let server_info = Arc::new(self.info);
242        let connecting_devices = Arc::new(RwLock::new(FnvHashSet::default()));
243
244        Router::new()
245            .route(
246                "/management/apiversions",
247                routing::get(|server_handler: ServerHandler| {
248                    server_handler.exec(async move |_params| ValueResponse { value: [1_u32] })
249                }),
250            )
251            .route("/management/v1/configureddevices", {
252                let this = Arc::clone(&devices);
253
254                routing::get(|server_handler: ServerHandler| {
255                    server_handler.exec(async move |_params| ValueResponse {
256                        value: this
257                            .iter_all()
258                            .map(|(device, number)| device.to_configured_device(number))
259                            .collect::<Vec<_>>(),
260                    })
261                })
262            })
263            .route("/management/v1/description", {
264                let server_info = Arc::clone(&server_info);
265
266                routing::get(move |server_handler: ServerHandler| {
267                    server_handler.exec(async move |_params| ValueResponse {
268                        value: Arc::clone(&server_info),
269                    })
270                })
271            })
272            .route("/setup", {
273                let this = Arc::clone(&devices);
274                let server_info = Arc::clone(&server_info);
275
276                routing::get(async move || {
277                    let mut setup_page = setup_page::SetupPage {
278                        server_info: &server_info,
279                        grouped_devices: BTreeMap::new(),
280                    };
281
282                    for (device, number) in this.iter_all() {
283                        let device = device.to_configured_device(number);
284
285                        setup_page
286                            .grouped_devices
287                            .entry(device.ty)
288                            .or_default()
289                            .push((number, device.name));
290                    }
291
292                    Html(setup_page.to_string())
293                })
294            })
295            .route(
296                "/api/v1/{device_type}/{device_number}/{action}",
297                routing::any(
298                    async move |Path(ApiPath {
299                              device_type,
300                              device_number,
301                              action,
302                          }),
303                          #[cfg(feature = "camera")] headers: http::HeaderMap,
304                          server_handler: ServerHandler| {
305                        #[cfg(feature = "camera")]
306                        let mut action = action;
307
308                        #[cfg(feature = "camera")]
309                        if device_type == DeviceType::Camera {
310                            use crate::api::camera::{ImageArray, ImageBytesResponse};
311
312                            // imagearrayvariant is soft-deprecated; we should accept it but
313                            // forward to the imagearray handler instead.
314                            if action == "imagearrayvariant" {
315                                action.truncate("imagearray".len());
316                            }
317
318                            if matches!(server_handler.params, ActionParams::Get { .. })
319                                && action == "imagearray"
320                                && ImageArray::is_accepted(&headers)
321                            {
322                                return server_handler
323                                    .exec(async move |_params| {
324                                        Ok::<_, Error>(ImageBytesResponse(
325                                            devices
326                                                .get_for_server::<dyn Camera>(device_number)?
327                                                .image_array()
328                                                .await?,
329                                        ))
330                                    })
331                                    .await;
332                            }
333                        }
334
335                        // Setup endpoint is not an ASCOM method, so doesn't need the transaction and ASCOMResult wrapping.
336                        if action == "setup" {
337                            return match devices
338                                .get_device_for_server(device_type, device_number)?
339                                .setup()
340                                .await
341                            {
342                                Ok(html) => Ok(Html(html).into_response()),
343                                Err(err) => {
344                                    Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{err:#}"))
345                                        .into())
346                                }
347                            };
348                        }
349
350                        // Handle Platform 7 connection methods.
351                        // It doesn't make sense to expose them in public API because our methods, including setters, are already asynchronous,
352                        // so we only need to handle these extra methods for 3rd-party client compatibility.
353                        if action == "connect" || action == "disconnect" {
354                            return server_handler.exec(async move |_params| {
355                                let device = devices.get_device_for_server(device_type, device_number)?;
356                                if let Ok(mut connecting_devices) = connecting_devices.write() {
357                                    _ = connecting_devices.insert(Arc::clone(&device));
358                                }
359                                _ = tokio::spawn(async move {
360                                    if let Err(err) = device.set_connected(action == "connect").await {
361                                        tracing::error!(%err, "Error changing device connection state");
362                                    }
363                                    if let Ok(mut connecting_devices) = connecting_devices.write() {
364                                        _ = connecting_devices.remove(&device);
365                                    }
366                                }.in_current_span());
367                                Result::Ok(())
368                            }).await;
369                        }
370                        if action == "connecting" {
371                            return server_handler.exec(async move |_params| {
372                                let device = devices.get_device_for_server(device_type, device_number)?;
373                                Result::Ok(connecting_devices.read().is_ok_and(|connecting_devices| connecting_devices.contains(&device)))
374                            }).await;
375                        }
376
377                        server_handler.exec(|params| devices.handle_action(device_type, device_number, &action, params)).await
378                    },
379                ),
380            )
381    }
382}