1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
8use fts_core::{
9 models::{BidderId, Outcome, ProductId, RawAuctionInput},
10 ports::{AuctionRepository, MarketRepository},
11};
12
13use axum::Json;
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use openapi::openapi_router;
19use rustc_hash::FxBuildHasher;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tokio::try_join;
27use tower_http::cors;
28
29mod openapi;
30mod routes;
31mod utils;
32
33pub use crate::openapi::MarketplaceApi;
34pub use utils::CustomJWTClaims;
35use utils::JWTVerifier;
36pub use utils::Now;
37pub use utils::generate_jwt;
38
39type SenderMap<T> =
43 Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
44
45#[derive(Clone)]
50pub struct AppState<T: MarketRepository> {
51 jwt: JWTVerifier,
53 market: T,
55 solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
57 activity_receiver: watch::Receiver<Result<Event, Infallible>>,
59 product_sender: SenderMap<ProductId>,
61 bidder_sender: SenderMap<BidderId>,
63}
64
65#[derive(Serialize)]
69pub struct Update<T> {
70 #[serde(with = "time::serde::rfc3339")]
72 pub from: OffsetDateTime,
73 #[serde(with = "time::serde::rfc3339")]
75 pub thru: OffsetDateTime,
76 #[serde(flatten)]
78 pub outcome: T,
79}
80
81#[derive(Serialize)]
83struct HealthResponse {
84 status: String,
85}
86
87async fn health_check() -> Json<HealthResponse> {
89 Json(HealthResponse {
90 status: "ok".to_string(),
91 })
92}
93
94pub fn state<T: MarketRepository>(
96 api_secret: &str,
97 market: T,
98) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
99 let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
101
102 let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
104
105 let product_sender: SenderMap<ProductId> = Default::default();
106 let bidder_sender: SenderMap<BidderId> = Default::default();
107
108 let solver = {
109 let market = market.clone();
110 let activity_sender = activity_sender.clone();
111 let product_sender = product_sender.clone();
112 let bidder_sender = bidder_sender.clone();
113 tokio::spawn(async move {
114 let solver = T::solver();
115 while let Some(auction) = solve_receiver.recv().await {
116 let id = auction.id.clone();
117
118 let submissions = auction.into_solver();
120
121 let fts_solver::AuctionOutcome {
123 submissions,
124 products,
125 } = solver.solve(&submissions).expect("could not solve auction");
126
127 let auth_outcomes = submissions
128 .values()
129 .flat_map(|outcome| {
130 outcome.iter().map(|(auth_id, auth_outcome)| {
131 (
132 auth_id.clone(),
133 Outcome {
134 price: auth_outcome.price,
135 trade: auth_outcome.trade,
136 data: None,
137 },
138 )
139 })
140 })
141 .collect::<Vec<_>>();
142
143 let product_outcomes = products
144 .iter()
145 .map(|(product_id, product_outcome)| {
146 (
147 product_id.clone(),
148 Outcome {
149 price: product_outcome.price,
150 trade: product_outcome.trade,
151 data: None,
152 },
153 )
154 })
155 .collect::<Vec<_>>();
156
157 let now = OffsetDateTime::now_utc().into();
158
159 let metadata = AuctionRepository::report(
163 &market.clone(),
164 id,
165 auth_outcomes.into_iter(),
166 product_outcomes.into_iter(),
167 now,
168 )
169 .await?;
170
171 if let Some(metadata) = metadata {
173 let _ = activity_sender.send_replace(Ok(Event::default()
174 .event("outcome")
175 .data(serde_json::to_string(&metadata).expect("infallible!"))));
176
177 for (product_id, outcome) in products {
180 if let Some(channel) = product_sender.get(&product_id) {
181 let update = Update {
182 from: metadata.from,
183 thru: metadata.thru,
184 outcome,
185 };
186 let _ = channel.send_replace(Ok(Event::default()
187 .event("outcome")
188 .data(serde_json::to_string(&update).expect("infallible!"))));
189 };
190 }
191
192 for (bidder_id, outcome) in submissions {
193 if let Some(channel) = bidder_sender.get(&bidder_id) {
194 let update = Update {
195 from: metadata.from,
196 thru: metadata.thru,
197 outcome,
198 };
199 let _ = channel.send_replace(Ok(Event::default()
200 .event("outcome")
201 .data(serde_json::to_string(&update).expect("infallible!"))));
202 }
203 }
204 }
205 }
206 Result::<(), T::Error>::Ok(())
207 })
208 };
209
210 let state = AppState {
211 jwt: JWTVerifier::from(api_secret),
212 market,
213 solve_queue: solve_sender,
214 activity_receiver,
215 product_sender,
216 bidder_sender,
217 };
218
219 (state, solver)
220}
221
222pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
224 let policy = cors::CorsLayer::new()
227 .allow_origin(cors::Any)
228 .allow_methods(cors::Any)
229 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
230
231 let app = Router::new()
233 .route("/health", axum::routing::get(health_check))
234 .nest("/v0/auths", routes::auths::router())
235 .nest("/v0/costs", routes::costs::router())
236 .nest("/v0/submissions", routes::submission::router(state.clone()))
238 .nest("/v0/products", routes::products::router())
240 .nest("/v0/outcomes", routes::outcomes::router())
242 .nest("/admin", routes::admin::router(state.clone()));
243
244 app.layer(policy).with_state(state)
245}
246
247pub async fn start<T: MarketRepository>(api_port: u16, api_secret: String, market: T) {
249 let listener = tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
251 .await
252 .expect("Unable to bind local port");
253 tracing::info!(
254 "Listening for requests on {}",
255 listener.local_addr().unwrap()
256 );
257
258 let (app_state, solver) = state(&api_secret, market);
259
260 let server = tokio::spawn(async move {
262 axum::serve(listener, router(app_state).merge(openapi_router())).await
263 });
264
265 let _ = try_join!(solver, server).expect("shutdown");
266}