fts_server/
lib.rs

1#![warn(missing_docs)]
2// Note: this overwrites the link in the README to point to the rust docs of the fts-demo crate.
3//! [fts_core]: https://docs.rs/fts_core/latest/fts_core/index.html
4//! [fts_server]: https://docs.rs/fts_server/latest/fts_server/index.html
5//! [fts_solver]: https://docs.rs/fts_solver/latest/fts_solver/index.html
6//! [fts_demo]: https://docs.rs/fts_demo/latest/fts_demo/index.html
7#![doc = include_str!("../README.md")]
8use fts_core::{
9    models::{BidderId, Outcome, ProductId, RawAuctionInput},
10    ports::{AuctionRepository, MarketRepository},
11};
12
13use axum::Json;
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use fxhash::FxBuildHasher;
19use openapi::openapi_router;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tokio::try_join;
27use tower_http::cors;
28
29mod openapi;
30mod routes;
31mod utils;
32
33pub use crate::openapi::MarketplaceApi;
34pub use utils::CustomJWTClaims;
35use utils::JWTVerifier;
36pub use utils::Now;
37pub use utils::generate_jwt;
38
39/// Type alias for a thread-safe map of senders for SSE events.
40///
41/// Used to maintain channels for server-sent events for real-time updates.
42type SenderMap<T> =
43    Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
44
45/// Application state shared across all request handlers.
46///
47/// Contains references to repositories, communication channels, and
48/// authentication services used throughout the application.
49#[derive(Clone)]
50pub struct AppState<T: MarketRepository> {
51    /// JWT verification service
52    jwt: JWTVerifier,
53    /// Market repository for data access
54    market: T,
55    /// Channel for queueing auction solves
56    solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
57    /// Channel for receiving activity updates
58    activity_receiver: watch::Receiver<Result<Event, Infallible>>,
59    /// Channels for product-specific updates
60    product_sender: SenderMap<ProductId>,
61    /// Channels for bidder-specific updates
62    bidder_sender: SenderMap<BidderId>,
63}
64
65/// Represents an update to be sent via server-sent events.
66///
67/// Contains auction outcome data along with its time range.
68#[derive(Serialize)]
69pub struct Update<T> {
70    /// Start time of the auction period
71    #[serde(with = "time::serde::rfc3339")]
72    pub from: OffsetDateTime,
73    /// End time of the auction period
74    #[serde(with = "time::serde::rfc3339")]
75    pub thru: OffsetDateTime,
76    /// Outcome data from the auction
77    #[serde(flatten)]
78    pub outcome: T,
79}
80
81/// Response for the health check endpoint
82#[derive(Serialize)]
83struct HealthResponse {
84    status: String,
85}
86
87/// Simple health check endpoint
88async fn health_check() -> Json<HealthResponse> {
89    Json(HealthResponse {
90        status: "ok".to_string(),
91    })
92}
93
94/// Creates the application state and solver background task.
95pub fn state<T: MarketRepository>(
96    api_secret: &str,
97    market: T,
98) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
99    // We create a FIFO queue for solving auctions
100    let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
101
102    // These channels are for reporting activity to SSE subscribers on /activity
103    let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
104
105    let product_sender: SenderMap<ProductId> = Default::default();
106    let bidder_sender: SenderMap<BidderId> = Default::default();
107
108    let solver = {
109        let market = market.clone();
110        let activity_sender = activity_sender.clone();
111        let product_sender = product_sender.clone();
112        let bidder_sender = bidder_sender.clone();
113        tokio::spawn(async move {
114            let solver = T::solver();
115            while let Some(auction) = solve_receiver.recv().await {
116                let id = auction.id.clone();
117
118                // Convert the auction into a format the solver understands
119                let submissions = auction.into_solver();
120
121                // TODO: this is where warm-starting would be used
122                let fts_solver::AuctionOutcome {
123                    submissions,
124                    products,
125                } = solver.solve(&submissions);
126
127                // TODO: update the API to scope the auth_id the bidder_id
128                let auth_outcomes = submissions
129                    .values()
130                    .flat_map(|outcome| {
131                        outcome.iter().map(|(auth_id, auth_outcome)| {
132                            (
133                                auth_id.clone(),
134                                Outcome {
135                                    price: auth_outcome.price,
136                                    trade: auth_outcome.trade,
137                                    data: None,
138                                },
139                            )
140                        })
141                    })
142                    .collect::<Vec<_>>();
143
144                let product_outcomes = products
145                    .iter()
146                    .map(|(product_id, product_outcome)| {
147                        (
148                            product_id.clone(),
149                            Outcome {
150                                price: product_outcome.price,
151                                trade: product_outcome.trade,
152                                data: None,
153                            },
154                        )
155                    })
156                    .collect::<Vec<_>>();
157
158                let now = OffsetDateTime::now_utc().into();
159
160                // We are basically copy/pasting solver::*Outcome into crate::Outcome, which seems silly.
161                // But, if the server wants to augment or transform the data somehow, this a reasonable
162                // indirection to have.
163                let metadata = AuctionRepository::report(
164                    &market.clone(),
165                    id,
166                    auth_outcomes.into_iter(),
167                    product_outcomes.into_iter(),
168                    now,
169                )
170                .await?;
171
172                // Now that we've stored the outcomes, we push updates to the SSE listeners
173                if let Some(metadata) = metadata {
174                    let _ = activity_sender.send_replace(Ok(Event::default()
175                        .event("outcome")
176                        .data(serde_json::to_string(&metadata).expect("infallible!"))));
177
178                    // We also individually broadcast the results to any listeners.
179                    // TODO: think about how to cleanup the dashmap over time
180                    for (product_id, outcome) in products {
181                        if let Some(channel) = product_sender.get(&product_id) {
182                            let update = Update {
183                                from: metadata.from,
184                                thru: metadata.thru,
185                                outcome,
186                            };
187                            let _ = channel.send_replace(Ok(Event::default()
188                                .event("outcome")
189                                .data(serde_json::to_string(&update).expect("infallible!"))));
190                        };
191                    }
192
193                    for (bidder_id, outcome) in submissions {
194                        if let Some(channel) = bidder_sender.get(&bidder_id) {
195                            let update = Update {
196                                from: metadata.from,
197                                thru: metadata.thru,
198                                outcome,
199                            };
200                            let _ = channel.send_replace(Ok(Event::default()
201                                .event("outcome")
202                                .data(serde_json::to_string(&update).expect("infallible!"))));
203                        }
204                    }
205                }
206            }
207            Result::<(), T::Error>::Ok(())
208        })
209    };
210
211    let state = AppState {
212        jwt: JWTVerifier::from(api_secret),
213        market,
214        solve_queue: solve_sender,
215        activity_receiver,
216        product_sender,
217        bidder_sender,
218    };
219
220    (state, solver)
221}
222
223/// Creates the application router with all routes configured.
224pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
225    // To allow for web app access, we use a permissive CORS policy. Notably,
226    // this strips any implicit authorization, making this a pretty decent policy.
227    let policy = cors::CorsLayer::new()
228        .allow_origin(cors::Any)
229        .allow_methods(cors::Any)
230        .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
231
232    // Wire it together
233    let app = Router::new()
234        .route("/health", axum::routing::get(health_check))
235        .nest("/v0/auths", routes::auths::router())
236        .nest("/v0/costs", routes::costs::router())
237        // Bidder-specific routes for their submission
238        .nest("/v0/submissions", routes::submission::router(state.clone()))
239        // View products and their results
240        .nest("/v0/products", routes::products::router())
241        // These are the SSE-routes for live-updates
242        .nest("/v0/outcomes", routes::outcomes::router())
243        .nest("/admin", routes::admin::router(state.clone()));
244
245    app.layer(policy).with_state(state)
246}
247
248/// Starts the HTTP server with the configured application.
249pub async fn start<T: MarketRepository>(api_port: u16, api_secret: String, market: T) {
250    // Setup the HTTP server
251    let listener = tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
252        .await
253        .expect("Unable to bind local port");
254    tracing::info!(
255        "Listening for requests on {}",
256        listener.local_addr().unwrap()
257    );
258
259    let (app_state, solver) = state(&api_secret, market);
260
261    // Setup the combined application state and serve it with our router
262    let server = tokio::spawn(async move {
263        axum::serve(listener, router(app_state).merge(openapi_router())).await
264    });
265
266    let _ = try_join!(solver, server).expect("shutdown");
267}