1#![warn(missing_docs)]
2#![doc = include_str!("../README.md")]
8use fts_core::{
9 models::{BidderId, Outcome, ProductId, RawAuctionInput},
10 ports::{AuctionRepository, MarketRepository, ProductRepository},
11};
12
13use axum::Json;
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use openapi::openapi_router;
19use rustc_hash::FxBuildHasher;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tower_http::cors;
27
28mod openapi;
29mod routes;
30mod utils;
31
32pub use crate::openapi::MarketplaceApi;
33pub use utils::CustomJWTClaims;
34use utils::JWTVerifier;
35pub use utils::Now;
36pub use utils::generate_jwt;
37
38type SenderMap<T> =
42 Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
43
44#[derive(Clone)]
49pub struct AppState<T: MarketRepository> {
50 jwt: JWTVerifier,
52 market: T,
54 solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
56 activity_receiver: watch::Receiver<Result<Event, Infallible>>,
58 product_sender: SenderMap<ProductId>,
60 bidder_sender: SenderMap<BidderId>,
62}
63
64impl<T: MarketRepository> AppState<T> {
65 pub async fn solve(
67 &self,
68 from: Option<OffsetDateTime>,
69 thru: OffsetDateTime,
70 by: Option<time::Duration>,
71 timestamp: OffsetDateTime,
72 ) -> Result<(), T::Error> {
73 let data = self.market.prepare(from, thru, by, timestamp).await?;
74 if let Some(auctions) = data {
75 for auction in auctions {
76 self.solve_queue
77 .send(auction)
78 .await
79 .expect("queue capacity exceeded");
80 }
81 Ok(())
82 } else {
83 Ok(())
84 }
85 }
86}
87
88#[derive(Serialize)]
92pub struct Update<T> {
93 #[serde(with = "time::serde::rfc3339")]
95 pub from: OffsetDateTime,
96 #[serde(with = "time::serde::rfc3339")]
98 pub thru: OffsetDateTime,
99 #[serde(flatten)]
101 pub outcome: T,
102}
103
104#[derive(Serialize)]
106struct HealthResponse {
107 status: String,
108}
109
110async fn health_check() -> Json<HealthResponse> {
112 Json(HealthResponse {
113 status: "ok".to_string(),
114 })
115}
116
117pub fn state<T: MarketRepository>(
119 api_secret: &str,
120 market: T,
121) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
122 let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
124
125 let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
127
128 let product_sender: SenderMap<ProductId> = Default::default();
129 let bidder_sender: SenderMap<BidderId> = Default::default();
130
131 let solver = {
132 let market = market.clone();
133 let activity_sender = activity_sender.clone();
134 let product_sender = product_sender.clone();
135 let bidder_sender = bidder_sender.clone();
136 tokio::spawn(async move {
137 let solver = T::solver();
138 while let Some(auction) = solve_receiver.recv().await {
139 let id = auction.id.clone();
140
141 let submissions = auction.into_solver();
143
144 let fts_solver::AuctionOutcome {
146 submissions,
147 products,
148 } = solver.solve(&submissions).expect("could not solve auction");
149
150 let auth_outcomes = submissions
151 .values()
152 .flat_map(|outcome| {
153 outcome.iter().map(|(auth_id, auth_outcome)| {
154 (
155 auth_id.clone(),
156 Outcome {
157 price: auth_outcome.price,
158 trade: auth_outcome.trade,
159 data: None,
160 },
161 )
162 })
163 })
164 .collect::<Vec<_>>();
165
166 let product_outcomes = products
167 .iter()
168 .map(|(product_id, product_outcome)| {
169 (
170 product_id.clone(),
171 Outcome {
172 price: product_outcome.price,
173 trade: product_outcome.trade,
174 data: None,
175 },
176 )
177 })
178 .collect::<Vec<_>>();
179
180 let now = OffsetDateTime::now_utc().into();
181
182 let metadata = AuctionRepository::report(
186 &market.clone(),
187 id,
188 auth_outcomes.into_iter(),
189 product_outcomes.into_iter(),
190 now,
191 )
192 .await?;
193
194 if let Some(metadata) = metadata {
196 let _ = activity_sender.send_replace(Ok(Event::default()
197 .event("outcome")
198 .data(serde_json::to_string(&metadata).expect("infallible!"))));
199
200 for (product_id, outcome) in products {
203 if let Some(channel) = product_sender.get(&product_id) {
204 let update = Update {
205 from: metadata.from,
206 thru: metadata.thru,
207 outcome,
208 };
209 let _ = channel.send_replace(Ok(Event::default()
210 .event("outcome")
211 .data(serde_json::to_string(&update).expect("infallible!"))));
212 };
213 }
214
215 for (bidder_id, outcome) in submissions {
216 if let Some(channel) = bidder_sender.get(&bidder_id) {
217 let update = Update {
218 from: metadata.from,
219 thru: metadata.thru,
220 outcome,
221 };
222 let _ = channel.send_replace(Ok(Event::default()
223 .event("outcome")
224 .data(serde_json::to_string(&update).expect("infallible!"))));
225 }
226 }
227 }
228 }
229 Result::<(), T::Error>::Ok(())
230 })
231 };
232
233 let state = AppState {
234 jwt: JWTVerifier::from(api_secret),
235 market,
236 solve_queue: solve_sender,
237 activity_receiver,
238 product_sender,
239 bidder_sender,
240 };
241
242 (state, solver)
243}
244
245pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
247 let policy = cors::CorsLayer::new()
250 .allow_origin(cors::Any)
251 .allow_methods(cors::Any)
252 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
253
254 let app = Router::new()
256 .route("/health", axum::routing::get(health_check))
257 .nest("/v0/auths", routes::auths::router())
258 .nest("/v0/costs", routes::costs::router())
259 .nest("/v0/submissions", routes::submission::router(state.clone()))
261 .nest("/v0/products", routes::products::router())
263 .nest("/v0/outcomes", routes::outcomes::router())
265 .nest("/admin", routes::admin::router(state.clone()));
266
267 app.layer(policy).with_state(state)
268}
269
270pub struct Server<T: MarketRepository> {
272 pub server: JoinHandle<Result<(), std::io::Error>>,
274 pub solver: JoinHandle<Result<(), <T as ProductRepository>::Error>>,
276}
277
278impl<T: MarketRepository> Server<T> {
279 pub async fn new(api_port: u16, api_secret: String, market: T) -> (Self, AppState<T>) {
281 let listener =
283 tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
284 .await
285 .expect("Unable to bind local port");
286 tracing::info!(
287 "Listening for requests on {}",
288 listener.local_addr().unwrap()
289 );
290
291 let (state, solver) = state(&api_secret, market);
292
293 let state_clone = state.clone();
295 let server = tokio::spawn(async move {
296 axum::serve(listener, router(state_clone).merge(openapi_router())).await
297 });
298
299 (Self { server, solver }, state)
300 }
301}