#![warn(missing_docs)]
#![doc = include_str!("../README.md")]
use fts_core::{
models::{BidderId, Outcome, ProductId, RawAuctionInput},
ports::{AuctionRepository, MarketRepository, ProductRepository},
};
use axum::Json;
use axum::Router;
use axum::http::header;
use axum::response::sse::Event;
use fts_solver::Solver;
use openapi::openapi_router;
use rustc_hash::FxBuildHasher;
use serde::Serialize;
use std::sync::Arc;
use std::{convert::Infallible, net::SocketAddr};
use time::OffsetDateTime;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
use tower_http::cors;
mod openapi;
mod routes;
mod utils;
pub use crate::openapi::MarketplaceApi;
pub use utils::CustomJWTClaims;
use utils::JWTVerifier;
pub use utils::Now;
pub use utils::generate_jwt;
type SenderMap<T> =
Arc<dashmap::DashMap<T, watch::Sender<Result<Event, Infallible>>, FxBuildHasher>>;
#[derive(Clone)]
pub struct AppState<T: MarketRepository> {
jwt: JWTVerifier,
market: T,
solve_queue: mpsc::Sender<RawAuctionInput<T::AuctionId>>,
activity_receiver: watch::Receiver<Result<Event, Infallible>>,
product_sender: SenderMap<ProductId>,
bidder_sender: SenderMap<BidderId>,
}
impl<T: MarketRepository> AppState<T> {
pub async fn solve(
&self,
from: Option<OffsetDateTime>,
thru: OffsetDateTime,
by: Option<time::Duration>,
timestamp: OffsetDateTime,
) -> Result<(), T::Error> {
let data = self.market.prepare(from, thru, by, timestamp).await?;
if let Some(auctions) = data {
for auction in auctions {
self.solve_queue
.send(auction)
.await
.expect("queue capacity exceeded");
}
Ok(())
} else {
Ok(())
}
}
}
#[derive(Serialize)]
pub struct Update<T> {
#[serde(with = "time::serde::rfc3339")]
pub from: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub thru: OffsetDateTime,
#[serde(flatten)]
pub outcome: T,
}
#[derive(Serialize)]
struct HealthResponse {
status: String,
}
async fn health_check() -> Json<HealthResponse> {
Json(HealthResponse {
status: "ok".to_string(),
})
}
pub fn state<T: MarketRepository>(
api_secret: &str,
market: T,
) -> (AppState<T>, JoinHandle<Result<(), T::Error>>) {
let (solve_sender, mut solve_receiver) = mpsc::channel::<RawAuctionInput<T::AuctionId>>(24);
let (activity_sender, activity_receiver) = watch::channel(Ok(Event::default().comment("")));
let product_sender: SenderMap<ProductId> = Default::default();
let bidder_sender: SenderMap<BidderId> = Default::default();
let solver = {
let market = market.clone();
let activity_sender = activity_sender.clone();
let product_sender = product_sender.clone();
let bidder_sender = bidder_sender.clone();
tokio::spawn(async move {
let solver = T::solver();
while let Some(auction) = solve_receiver.recv().await {
let id = auction.id.clone();
let submissions = auction.into_solver();
let fts_solver::AuctionOutcome {
submissions,
products,
} = solver.solve(&submissions).expect("could not solve auction");
let auth_outcomes = submissions
.values()
.flat_map(|outcome| {
outcome.iter().map(|(auth_id, auth_outcome)| {
(
auth_id.clone(),
Outcome {
price: auth_outcome.price,
trade: auth_outcome.trade,
data: None,
},
)
})
})
.collect::<Vec<_>>();
let product_outcomes = products
.iter()
.map(|(product_id, product_outcome)| {
(
product_id.clone(),
Outcome {
price: product_outcome.price,
trade: product_outcome.trade,
data: None,
},
)
})
.collect::<Vec<_>>();
let now = OffsetDateTime::now_utc().into();
let metadata = AuctionRepository::report(
&market.clone(),
id,
auth_outcomes.into_iter(),
product_outcomes.into_iter(),
now,
)
.await?;
if let Some(metadata) = metadata {
let _ = activity_sender.send_replace(Ok(Event::default()
.event("outcome")
.data(serde_json::to_string(&metadata).expect("infallible!"))));
for (product_id, outcome) in products {
if let Some(channel) = product_sender.get(&product_id) {
let update = Update {
from: metadata.from,
thru: metadata.thru,
outcome,
};
let _ = channel.send_replace(Ok(Event::default()
.event("outcome")
.data(serde_json::to_string(&update).expect("infallible!"))));
};
}
for (bidder_id, outcome) in submissions {
if let Some(channel) = bidder_sender.get(&bidder_id) {
let update = Update {
from: metadata.from,
thru: metadata.thru,
outcome,
};
let _ = channel.send_replace(Ok(Event::default()
.event("outcome")
.data(serde_json::to_string(&update).expect("infallible!"))));
}
}
}
}
Result::<(), T::Error>::Ok(())
})
};
let state = AppState {
jwt: JWTVerifier::from(api_secret),
market,
solve_queue: solve_sender,
activity_receiver,
product_sender,
bidder_sender,
};
(state, solver)
}
pub fn router<T: MarketRepository>(state: AppState<T>) -> Router {
let policy = cors::CorsLayer::new()
.allow_origin(cors::Any)
.allow_methods(cors::Any)
.allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE]);
let app = Router::new()
.route("/health", axum::routing::get(health_check))
.nest("/v0/auths", routes::auths::router())
.nest("/v0/costs", routes::costs::router())
.nest("/v0/submissions", routes::submission::router(state.clone()))
.nest("/v0/products", routes::products::router())
.nest("/v0/outcomes", routes::outcomes::router())
.nest("/admin", routes::admin::router(state.clone()));
app.layer(policy).with_state(state)
}
pub struct Server<T: MarketRepository> {
pub server: JoinHandle<Result<(), std::io::Error>>,
pub solver: JoinHandle<Result<(), <T as ProductRepository>::Error>>,
}
impl<T: MarketRepository> Server<T> {
pub async fn new(api_port: u16, api_secret: String, market: T) -> (Self, AppState<T>) {
let listener =
tokio::net::TcpListener::bind(SocketAddr::new([0, 0, 0, 0].into(), api_port))
.await
.expect("Unable to bind local port");
tracing::info!(
"Listening for requests on {}",
listener.local_addr().unwrap()
);
let (state, solver) = state(&api_secret, market);
let state_clone = state.clone();
let server = tokio::spawn(async move {
axum::serve(listener, router(state_clone).merge(openapi_router())).await
});
(Self { server, solver }, state)
}
}