1#![warn(missing_docs)]
2#![doc = include_str!("../docs/workspace.md")]
8#![doc = include_str!("../README.md")]
9use fts_core::{
10 models::{AuthId, Outcome, ProductId, RawAuctionInput},
11 ports::{AuctionRepository, MarketRepository},
12};
13
14use axum::Router;
15use axum::http::header;
16use axum::response::sse::Event;
17use fts_solver::Solver;
18use fxhash::FxBuildHasher;
19use openapi::openapi_router;
20use serde::Serialize;
21use std::sync::Arc;
22use std::{convert::Infallible, net::SocketAddr};
23use time::OffsetDateTime;
24use tokio::sync::{mpsc, watch};
25use tokio::task::JoinHandle;
26use tokio::try_join;
27use tower_http::cors;
28
29mod openapi;
30mod routes;
31mod utils;
32
33pub use utils::CustomJWTClaims;
34use utils::JWTVerifier;
35pub use utils::Now;
36pub use utils::generate_jwt;
37
38type SenderMap<T> =
42 Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
43
44#[derive(Clone)]
49pub struct AppState<T: MarketRepository> {
50 jwt: JWTVerifier,
52 market: T,
54 solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
56 activity_receiver: watch::Receiver<Result<Event, Infallible>>,
58 product_sender: SenderMap<ProductId>,
60 auth_sender: SenderMap<AuthId>,
62}
63
64#[derive(Serialize)]
68pub struct Update {
69 #[serde(with = "time::serde::rfc3339")]
71 pub from: OffsetDateTime,
72 #[serde(with = "time::serde::rfc3339")]
74 pub thru: OffsetDateTime,
75 #[serde(flatten)]
77 pub outcome: Outcome<()>,
78}
79
80pub fn state<T: MarketRepository>(
82 api_secret: &str,
83 market: T,
84) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
85 let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
87
88 let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
90
91 let product_sender: SenderMap<ProductId> = Default::default();
92 let auth_sender: SenderMap<AuthId> = Default::default();
93
94 let solver = {
95 let market = market.clone();
96 let activity_sender = activity_sender.clone();
97 let product_sender = product_sender.clone();
98 let auth_sender = auth_sender.clone();
99 tokio::spawn(async move {
100 let solver = T::solver();
101 while let Some(auction) = solve_receiver.recv().await {
102 let id = auction.id.clone();
103
104 let submissions: Vec<fts_solver::Submission<_, _>> = auction.into();
106
107 let fts_solver::AuctionOutcome { auths, products } = solver.solve(&submissions);
109
110 let auth_outcomes = auths
112 .iter()
113 .map(|(auth_id, outcome)| {
114 (
115 auth_id.clone(),
116 Outcome {
117 price: outcome.price,
118 trade: outcome.trade,
119 data: None,
120 },
121 )
122 })
123 .collect::<Vec<_>>();
124
125 let product_outcomes = products
126 .iter()
127 .map(|(id, outcome)| {
128 (
129 id.clone(),
130 Outcome {
131 price: outcome.price,
132 trade: outcome.volume,
133 data: None,
134 },
135 )
136 })
137 .collect::<Vec<_>>();
138
139 let now = OffsetDateTime::now_utc().into();
140
141 let metadata = AuctionRepository::report(
145 &market.clone(),
146 id,
147 auth_outcomes.into_iter(),
148 product_outcomes.into_iter(),
149 now,
150 )
151 .await?;
152
153 if let Some(metadata) = metadata {
155 let _ = activity_sender.send_replace(Ok(Event::default()
156 .event("outcome")
157 .data(serde_json::to_string(&metadata).expect("infallible!"))));
158
159 for (product_id, product_outcome) in products {
162 if let Some(channel) = product_sender.get(&product_id) {
163 let update = Update {
164 from: metadata.from,
165 thru: metadata.thru,
166 outcome: Outcome {
167 price: product_outcome.price,
168 trade: product_outcome.volume,
169 data: None,
170 },
171 };
172 let _ = channel.send_replace(Ok(Event::default()
173 .event("outcome")
174 .data(serde_json::to_string(&update).expect("infallible!"))));
175 };
176 }
177
178 for (auth_id, auth_outcome) in auths {
179 if let Some(channel) = auth_sender.get(&auth_id) {
180 let update = Update {
181 from: metadata.from,
182 thru: metadata.thru,
183 outcome: Outcome {
184 price: auth_outcome.price,
185 trade: auth_outcome.trade,
186 data: None,
187 },
188 };
189 let _ = channel.send_replace(Ok(Event::default()
190 .event("outcome")
191 .data(serde_json::to_string(&update).expect("infallible!"))));
192 }
193 }
194 }
195 }
196 Result::<(), T::Error>::Ok(())
197 })
198 };
199
200 let state = AppState {
201 jwt: JWTVerifier::from(api_secret),
202 market,
203 solve_queue: solve_sender,
204 activity_receiver,
205 product_sender,
206 auth_sender,
207 };
208
209 (state, solver)
210}
211
212pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
214 let policy = cors::CorsLayer::new()
217 .allow_origin(cors::Any)
218 .allow_methods(cors::Any)
219 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
220
221 let app = Router::new()
223 .nest("/v0/auths", routes::auths::router())
224 .nest("/v0/costs", routes::costs::router())
225 .nest("/v0/submissions", routes::submission::router(state.clone()))
227 .nest("/v0/products", routes::products::router())
229 .nest("/v0/outcomes", routes::outcomes::router())
231 .nest("/admin", routes::admin::router(state.clone()));
232
233 app.layer(policy).with_state(state)
234}
235
236pub async fn start<T: MarketRepository>(api_port: u16, api_secret: String, market: T) {
238 let listener = tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
240 .await
241 .expect("Unable to bind local port");
242 tracing::info!(
243 "Listening for requests on {}",
244 listener.local_addr().unwrap()
245 );
246
247 let (app_state, solver) = state(&api_secret, market);
248
249 let server = tokio::spawn(async move {
251 axum::serve(listener, router(app_state).merge(openapi_router())).await
252 });
253
254 let _ = try_join!(solver, server).expect("shutdown");
255}