fts_server/
lib.rs

1#![warn(missing_docs)]
2// Note: this overwrites the link in the README to point to the rust docs of the fts-demo crate.
3//! [fts_core]: https://docs.rs/fts_core/latest/fts_core/index.html
4//! [fts_server]: https://docs.rs/fts_server/latest/fts_server/index.html
5//! [fts_solver]: https://docs.rs/fts_solver/latest/fts_solver/index.html
6//! [fts_demo]: https://docs.rs/fts_demo/latest/fts_demo/index.html
7#![doc = include_str!("../docs/workspace.md")]
8#![doc = include_str!("../README.md")]
9use fts_core::{
10    models::{AuthId, Outcome, ProductId, RawAuctionInput},
11    ports::{AuctionRepository, MarketRepository},
12};
13
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use fxhash::FxBuildHasher;
19use openapi::openapi_router;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tokio::try_join;
27use tower_http::cors;
28
29mod openapi;
30mod routes;
31mod utils;
32
33pub use utils::CustomJWTClaims;
34use utils::JWTVerifier;
35pub use utils::Now;
36pub use utils::generate_jwt;
37
38/// Type alias for a thread-safe map of senders for SSE events.
39///
40/// Used to maintain channels for server-sent events for real-time updates.
41type SenderMap<T> =
42    Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
43
44/// Application state shared across all request handlers.
45///
46/// Contains references to repositories, communication channels, and
47/// authentication services used throughout the application.
48#[derive(Clone)]
49pub struct AppState<T: MarketRepository> {
50    /// JWT verification service
51    jwt: JWTVerifier,
52    /// Market repository for data access
53    market: T,
54    /// Channel for queueing auction solves
55    solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
56    /// Channel for receiving activity updates
57    activity_receiver: watch::Receiver<Result<Event, Infallible>>,
58    /// Channels for product-specific updates
59    product_sender: SenderMap<ProductId>,
60    /// Channels for authorization-specific updates
61    auth_sender: SenderMap<AuthId>,
62}
63
64/// Represents an update to be sent via server-sent events.
65///
66/// Contains auction outcome data along with its time range.
67#[derive(Serialize)]
68pub struct Update {
69    /// Start time of the auction period
70    #[serde(with = "time::serde::rfc3339")]
71    pub from: OffsetDateTime,
72    /// End time of the auction period
73    #[serde(with = "time::serde::rfc3339")]
74    pub thru: OffsetDateTime,
75    /// Outcome data from the auction
76    #[serde(flatten)]
77    pub outcome: Outcome<()>,
78}
79
80/// Creates the application state and solver background task.
81pub fn state<T: MarketRepository>(
82    api_secret: &str,
83    market: T,
84) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
85    // We create a FIFO queue for solving auctions
86    let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
87
88    // These channels are for reporting activity to SSE subscribers on /activity
89    let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
90
91    let product_sender: SenderMap<ProductId> = Default::default();
92    let auth_sender: SenderMap<AuthId> = Default::default();
93
94    let solver = {
95        let market = market.clone();
96        let activity_sender = activity_sender.clone();
97        let product_sender = product_sender.clone();
98        let auth_sender = auth_sender.clone();
99        tokio::spawn(async move {
100            let solver = T::solver();
101            while let Some(auction) = solve_receiver.recv().await {
102                let id = auction.id.clone();
103
104                // Convert the auction into a format the solver understands
105                let submissions: Vec<fts_solver::Submission<_, _>> = auction.into();
106
107                // TODO: this is where warm-starting would be used
108                let fts_solver::AuctionOutcome { auths, products } = solver.solve(&submissions);
109
110                // TODO: update the API to scope the auth_id the bidder_id
111                let auth_outcomes = auths
112                    .iter()
113                    .map(|(auth_id, outcome)| {
114                        (
115                            auth_id.clone(),
116                            Outcome {
117                                price: outcome.price,
118                                trade: outcome.trade,
119                                data: None,
120                            },
121                        )
122                    })
123                    .collect::<Vec<_>>();
124
125                let product_outcomes = products
126                    .iter()
127                    .map(|(id, outcome)| {
128                        (
129                            id.clone(),
130                            Outcome {
131                                price: outcome.price,
132                                trade: outcome.volume,
133                                data: None,
134                            },
135                        )
136                    })
137                    .collect::<Vec<_>>();
138
139                let now = OffsetDateTime::now_utc().into();
140
141                // We are basically copy/pasting solver::*Outcome into crate::Outcome, which seems silly.
142                // But, if the server wants to augment or transform the data somehow, this a reasonable
143                // indirection to have.
144                let metadata = AuctionRepository::report(
145                    &market.clone(),
146                    id,
147                    auth_outcomes.into_iter(),
148                    product_outcomes.into_iter(),
149                    now,
150                )
151                .await?;
152
153                // Now that we've stored the outcomes, we push updates to the SSE listeners
154                if let Some(metadata) = metadata {
155                    let _ = activity_sender.send_replace(Ok(Event::default()
156                        .event("outcome")
157                        .data(serde_json::to_string(&metadata).expect("infallible!"))));
158
159                    // We also individually broadcast the results to any listeners.
160                    // TODO: think about how to cleanup the dashmap over time
161                    for (product_id, product_outcome) in products {
162                        if let Some(channel) = product_sender.get(&product_id) {
163                            let update = Update {
164                                from: metadata.from,
165                                thru: metadata.thru,
166                                outcome: Outcome {
167                                    price: product_outcome.price,
168                                    trade: product_outcome.volume,
169                                    data: None,
170                                },
171                            };
172                            let _ = channel.send_replace(Ok(Event::default()
173                                .event("outcome")
174                                .data(serde_json::to_string(&update).expect("infallible!"))));
175                        };
176                    }
177
178                    for (auth_id, auth_outcome) in auths {
179                        if let Some(channel) = auth_sender.get(&auth_id) {
180                            let update = Update {
181                                from: metadata.from,
182                                thru: metadata.thru,
183                                outcome: Outcome {
184                                    price: auth_outcome.price,
185                                    trade: auth_outcome.trade,
186                                    data: None,
187                                },
188                            };
189                            let _ = channel.send_replace(Ok(Event::default()
190                                .event("outcome")
191                                .data(serde_json::to_string(&update).expect("infallible!"))));
192                        }
193                    }
194                }
195            }
196            Result::<(), T::Error>::Ok(())
197        })
198    };
199
200    let state = AppState {
201        jwt: JWTVerifier::from(api_secret),
202        market,
203        solve_queue: solve_sender,
204        activity_receiver,
205        product_sender,
206        auth_sender,
207    };
208
209    (state, solver)
210}
211
212/// Creates the application router with all routes configured.
213pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
214    // To allow for web app access, we use a permissive CORS policy. Notably,
215    // this strips any implicit authorization, making this a pretty decent policy.
216    let policy = cors::CorsLayer::new()
217        .allow_origin(cors::Any)
218        .allow_methods(cors::Any)
219        .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
220
221    // Wire it together
222    let app = Router::new()
223        .nest("/v0/auths", routes::auths::router())
224        .nest("/v0/costs", routes::costs::router())
225        // Bidder-specific routes for their submission
226        .nest("/v0/submissions", routes::submission::router(state.clone()))
227        // View products and their results
228        .nest("/v0/products", routes::products::router())
229        // These are the SSE-routes for live-updates
230        .nest("/v0/outcomes", routes::outcomes::router())
231        .nest("/admin", routes::admin::router(state.clone()));
232
233    app.layer(policy).with_state(state)
234}
235
236/// Starts the HTTP server with the configured application.
237pub async fn start<T: MarketRepository>(api_port: u16, api_secret: String, market: T) {
238    // Setup the HTTP server
239    let listener = tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
240        .await
241        .expect("Unable to bind local port");
242    tracing::info!(
243        "Listening for requests on {}",
244        listener.local_addr().unwrap()
245    );
246
247    let (app_state, solver) = state(&api_secret, market);
248
249    // Setup the combined application state and serve it with our router
250    let server = tokio::spawn(async move {
251        axum::serve(listener, router(app_state).merge(openapi_router())).await
252    });
253
254    let _ = try_join!(solver, server).expect("shutdown");
255}