fts_server/
lib.rs

1#![warn(missing_docs)]
2// Note: this overwrites the link in the README to point to the rust docs of the fts-sqlite crate.
3//! [fts_core]: https://docs.rs/fts_core/latest/fts_core/index.html
4//! [fts_server]: https://docs.rs/fts_server/latest/fts_server/index.html
5//! [fts_solver]: https://docs.rs/fts_solver/latest/fts_solver/index.html
6//! [fts_sqlite]: https://docs.rs/fts_sqlite/latest/fts_sqlite/index.html
7#![doc = include_str!("../README.md")]
8use fts_core::{
9    models::{BidderId, Outcome, ProductId, RawAuctionInput},
10    ports::{AuctionRepository, MarketRepository, ProductRepository},
11};
12
13use axum::Json;
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use openapi::openapi_router;
19use rustc_hash::FxBuildHasher;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tower_http::cors;
27
28mod openapi;
29mod routes;
30mod utils;
31
32pub use crate::openapi::MarketplaceApi;
33pub use utils::CustomJWTClaims;
34use utils::JWTVerifier;
35pub use utils::Now;
36pub use utils::generate_jwt;
37
38/// Type alias for a thread-safe map of senders for SSE events.
39///
40/// Used to maintain channels for server-sent events for real-time updates.
41type SenderMap<T> =
42    Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
43
44/// Application state shared across all request handlers.
45///
46/// Contains references to repositories, communication channels, and
47/// authentication services used throughout the application.
48#[derive(Clone)]
49pub struct AppState<T: MarketRepository> {
50    /// JWT verification service
51    jwt: JWTVerifier,
52    /// Market repository for data access
53    market: T,
54    /// Channel for queueing auction solves
55    solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
56    /// Channel for receiving activity updates
57    activity_receiver: watch::Receiver<Result<Event, Infallible>>,
58    /// Channels for product-specific updates
59    product_sender: SenderMap<ProductId>,
60    /// Channels for bidder-specific updates
61    bidder_sender: SenderMap<BidderId>,
62}
63
64impl<T: MarketRepository> AppState<T> {
65    /// Wrap the implementation to provide a convenient method to send auctions to the solve queue
66    pub async fn solve(
67        &self,
68        from: Option<OffsetDateTime>,
69        thru: OffsetDateTime,
70        by: Option<time::Duration>,
71        timestamp: OffsetDateTime,
72    ) -> Result<(), T::Error> {
73        let data = self.market.prepare(from, thru, by, timestamp).await?;
74        if let Some(auctions) = data {
75            for auction in auctions {
76                self.solve_queue
77                    .send(auction)
78                    .await
79                    .expect("queue capacity exceeded");
80            }
81            Ok(())
82        } else {
83            Ok(())
84        }
85    }
86}
87
88/// Represents an update to be sent via server-sent events.
89///
90/// Contains auction outcome data along with its time range.
91#[derive(Serialize)]
92pub struct Update<T> {
93    /// Start time of the auction period
94    #[serde(with = "time::serde::rfc3339")]
95    pub from: OffsetDateTime,
96    /// End time of the auction period
97    #[serde(with = "time::serde::rfc3339")]
98    pub thru: OffsetDateTime,
99    /// Outcome data from the auction
100    #[serde(flatten)]
101    pub outcome: T,
102}
103
104/// Response for the health check endpoint
105#[derive(Serialize)]
106struct HealthResponse {
107    status: String,
108}
109
110/// Simple health check endpoint
111async fn health_check() -> Json<HealthResponse> {
112    Json(HealthResponse {
113        status: "ok".to_string(),
114    })
115}
116
117/// Creates the application state and solver background task.
118pub fn state<T: MarketRepository>(
119    api_secret: &str,
120    market: T,
121) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
122    // We create a FIFO queue for solving auctions
123    let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
124
125    // These channels are for reporting activity to SSE subscribers on /activity
126    let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
127
128    let product_sender: SenderMap<ProductId> = Default::default();
129    let bidder_sender: SenderMap<BidderId> = Default::default();
130
131    let solver = {
132        let market = market.clone();
133        let activity_sender = activity_sender.clone();
134        let product_sender = product_sender.clone();
135        let bidder_sender = bidder_sender.clone();
136        tokio::spawn(async move {
137            let solver = T::solver();
138            while let Some(auction) = solve_receiver.recv().await {
139                let id = auction.id.clone();
140
141                // Convert the auction into a format the solver understands
142                let submissions = auction.into_solver();
143
144                // TODO: this is where warm-starting would be used
145                let fts_solver::AuctionOutcome {
146                    submissions,
147                    products,
148                } = solver.solve(&submissions).expect("could not solve auction");
149
150                let auth_outcomes = submissions
151                    .values()
152                    .flat_map(|outcome| {
153                        outcome.iter().map(|(auth_id, auth_outcome)| {
154                            (
155                                auth_id.clone(),
156                                Outcome {
157                                    price: auth_outcome.price,
158                                    trade: auth_outcome.trade,
159                                    data: None,
160                                },
161                            )
162                        })
163                    })
164                    .collect::<Vec<_>>();
165
166                let product_outcomes = products
167                    .iter()
168                    .map(|(product_id, product_outcome)| {
169                        (
170                            product_id.clone(),
171                            Outcome {
172                                price: product_outcome.price,
173                                trade: product_outcome.trade,
174                                data: None,
175                            },
176                        )
177                    })
178                    .collect::<Vec<_>>();
179
180                let now = OffsetDateTime::now_utc().into();
181
182                // We are basically copy/pasting solver::*Outcome into crate::Outcome, which seems silly.
183                // But, if the server wants to augment or transform the data somehow, this a reasonable
184                // indirection to have.
185                let metadata = AuctionRepository::report(
186                    &market.clone(),
187                    id,
188                    auth_outcomes.into_iter(),
189                    product_outcomes.into_iter(),
190                    now,
191                )
192                .await?;
193
194                // Now that we've stored the outcomes, we push updates to the SSE listeners
195                if let Some(metadata) = metadata {
196                    let _ = activity_sender.send_replace(Ok(Event::default()
197                        .event("outcome")
198                        .data(serde_json::to_string(&metadata).expect("infallible!"))));
199
200                    // We also individually broadcast the results to any listeners.
201                    // TODO: think about how to cleanup the dashmap over time
202                    for (product_id, outcome) in products {
203                        if let Some(channel) = product_sender.get(&product_id) {
204                            let update = Update {
205                                from: metadata.from,
206                                thru: metadata.thru,
207                                outcome,
208                            };
209                            let _ = channel.send_replace(Ok(Event::default()
210                                .event("outcome")
211                                .data(serde_json::to_string(&update).expect("infallible!"))));
212                        };
213                    }
214
215                    for (bidder_id, outcome) in submissions {
216                        if let Some(channel) = bidder_sender.get(&bidder_id) {
217                            let update = Update {
218                                from: metadata.from,
219                                thru: metadata.thru,
220                                outcome,
221                            };
222                            let _ = channel.send_replace(Ok(Event::default()
223                                .event("outcome")
224                                .data(serde_json::to_string(&update).expect("infallible!"))));
225                        }
226                    }
227                }
228            }
229            Result::<(), T::Error>::Ok(())
230        })
231    };
232
233    let state = AppState {
234        jwt: JWTVerifier::from(api_secret),
235        market,
236        solve_queue: solve_sender,
237        activity_receiver,
238        product_sender,
239        bidder_sender,
240    };
241
242    (state, solver)
243}
244
245/// Creates the application router with all routes configured.
246pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
247    // To allow for web app access, we use a permissive CORS policy. Notably,
248    // this strips any implicit authorization, making this a pretty decent policy.
249    let policy = cors::CorsLayer::new()
250        .allow_origin(cors::Any)
251        .allow_methods(cors::Any)
252        .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
253
254    // Wire it together
255    let app = Router::new()
256        .route("/health", axum::routing::get(health_check))
257        .nest("/v0/auths", routes::auths::router())
258        .nest("/v0/costs", routes::costs::router())
259        // Bidder-specific routes for their submission
260        .nest("/v0/submissions", routes::submission::router(state.clone()))
261        // View products and their results
262        .nest("/v0/products", routes::products::router())
263        // These are the SSE-routes for live-updates
264        .nest("/v0/outcomes", routes::outcomes::router())
265        .nest("/admin", routes::admin::router(state.clone()));
266
267    app.layer(policy).with_state(state)
268}
269
270/// An object to hold the handles for the relevant tokio processes
271pub struct Server<T: MarketRepository> {
272    /// The webserver
273    pub server: JoinHandle<Result<(), std::io::Error>>,
274    /// The solver
275    pub solver: JoinHandle<Result<(), <T as ProductRepository>::Error>>,
276}
277
278impl<T: MarketRepository> Server<T> {
279    /// Starts the HTTP server with the configured application.
280    pub async fn new(api_port: u16, api_secret: String, market: T) -> (Self, AppState<T>) {
281        // Setup the HTTP server
282        let listener =
283            tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
284                .await
285                .expect("Unable to bind local port");
286        tracing::info!(
287            "Listening for requests on {}",
288            listener.local_addr().unwrap()
289        );
290
291        let (state, solver) = state(&api_secret, market);
292
293        // Setup the combined application state and serve it with our router
294        let state_clone = state.clone();
295        let server = tokio::spawn(async move {
296            axum::serve(listener, router(state_clone).merge(openapi_router())).await
297        });
298
299        (Self { server, solver }, state)
300    }
301}