use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};
use acceptxmr::{
storage::stores::InMemory, Invoice, InvoiceId, PaymentGateway, PaymentGatewayBuilder,
Subscriber,
};
use actix::{prelude::Stream, Actor, ActorContext, AsyncContext, StreamHandler};
use actix_files::Files;
use actix_session::{
config::CookieContentSecurity, storage::CookieSessionStore, Session, SessionMiddleware,
};
use actix_web::{
cookie, get,
http::header::{CacheControl, CacheDirective},
post, web,
web::Data,
App, HttpRequest, HttpResponse, HttpServer,
};
use actix_web_actors::ws;
use bytestring::ByteString;
use log::{debug, error, info, warn, LevelFilter};
use rand::{thread_rng, Rng};
use serde::Deserialize;
use serde_json::json;
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
const SESSION_KEY_LEN: usize = 64;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::builder()
.filter_level(LevelFilter::Warn)
.filter_module("acceptxmr", log::LevelFilter::Debug)
.filter_module("websockets", log::LevelFilter::Trace)
.init();
let private_view_key = "ad2093a5705b9f33e6f0f0c1bc1f5f639c756cdfc168c8f2ac6127ccbdab3a03";
let primary_address = "4613YiHLM6JMH4zejMB2zJY5TwQCxL8p65ufw8kBP5yxX9itmuGLqp1dS4tkVoTxjyH3aYhYNrtGHbQzJQP5bFus3KHVdmf";
let invoice_store = InMemory::new();
let payment_gateway = PaymentGatewayBuilder::new(
private_view_key.to_string(),
primary_address.to_string(),
invoice_store,
)
.daemon_url("http://xmr-node.cakewallet.com:18081".to_string())
.build()
.await
.expect("failed to build payment gateway");
info!("Payment gateway created.");
payment_gateway
.run()
.await
.expect("failed to run payment gateway");
info!("Payment gateway running.");
let gateway_copy = payment_gateway.clone();
tokio::spawn(async move {
let mut subscriber = gateway_copy.subscribe_all();
loop {
let Some(invoice) = subscriber.blocking_recv() else {
panic!("Blockchain scanner crashed!")
};
if (invoice.is_confirmed() && invoice.creation_height() < invoice.current_height())
|| invoice.is_expired()
{
debug!(
"Invoice to index {} is either confirmed or expired. Removing invoice now",
invoice.index()
);
if let Err(e) = gateway_copy.remove_invoice(invoice.id()).await {
error!("Failed to remove fully confirmed invoice: {}", e);
};
}
}
});
let mut key_arr = [0u8; SESSION_KEY_LEN];
thread_rng().fill(&mut key_arr[..]);
let session_key = cookie::Key::generate();
let shared_payment_gateway = Data::new(payment_gateway);
HttpServer::new(move || {
App::new()
.wrap(
SessionMiddleware::builder(CookieSessionStore::default(), session_key.clone())
.cookie_name("acceptxmr_session".to_string())
.cookie_secure(false)
.cookie_content_security(CookieContentSecurity::Private)
.build(),
)
.app_data(shared_payment_gateway.clone())
.service(update)
.service(checkout)
.service(websocket)
.service(
Files::new("", "./library/examples/websockets/static").index_file("index.html"),
)
})
.bind("0.0.0.0:8080")?
.run()
.await
}
#[derive(Deserialize)]
struct CheckoutInfo {
message: String,
}
#[allow(clippy::unused_async)]
#[post("/checkout")]
async fn checkout(
session: Session,
checkout_info: web::Json<CheckoutInfo>,
payment_gateway: web::Data<PaymentGateway<InMemory>>,
) -> Result<HttpResponse, actix_web::Error> {
let invoice_id = payment_gateway
.new_invoice(1_000_000_000, 2, 5, checkout_info.message.clone())
.await
.unwrap();
session.insert("id", invoice_id)?;
Ok(HttpResponse::Ok()
.append_header(CacheControl(vec![CacheDirective::NoStore]))
.finish())
}
#[allow(clippy::unused_async)]
#[get("/update")]
async fn update(
session: Session,
payment_gateway: web::Data<PaymentGateway<InMemory>>,
) -> Result<HttpResponse, actix_web::Error> {
if let Ok(Some(invoice_id)) = session.get::<InvoiceId>("id") {
if let Ok(Some(invoice)) = payment_gateway.get_invoice(invoice_id).await {
return Ok(HttpResponse::Ok()
.append_header(CacheControl(vec![CacheDirective::NoStore]))
.json(json!(
{
"address": invoice.address(),
"amount_paid": invoice.amount_paid(),
"amount_requested": invoice.amount_requested(),
"uri": invoice.uri(),
"confirmations": invoice.confirmations(),
"confirmations_required": invoice.confirmations_required(),
"expiration_in": invoice.expiration_in(),
}
)));
};
}
Ok(HttpResponse::Gone()
.append_header(CacheControl(vec![CacheDirective::NoStore]))
.finish())
}
#[allow(clippy::unused_async)]
#[get("/ws/")]
async fn websocket(
session: Session,
req: HttpRequest,
stream: web::Payload,
payment_gateway: web::Data<PaymentGateway<InMemory>>,
) -> Result<HttpResponse, actix_web::Error> {
let Ok(Some(invoice_id)) = session.get::<InvoiceId>("id") else {
return Ok(HttpResponse::NotFound()
.append_header(CacheControl(vec![CacheDirective::NoStore]))
.finish());
};
let Some(subscriber) = payment_gateway.subscribe(invoice_id) else {
return Ok(HttpResponse::NotFound()
.append_header(CacheControl(vec![CacheDirective::NoStore]))
.finish());
};
let websocket = WebSocket::new(subscriber);
ws::start(websocket, &req, stream)
}
struct WebSocket {
last_heartbeat: Instant,
invoice_subscriber: Option<Subscriber>,
}
impl WebSocket {
fn new(invoice_subscriber: Subscriber) -> Self {
Self {
last_heartbeat: Instant::now(),
invoice_subscriber: Some(invoice_subscriber),
}
}
fn heartbeat(ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.last_heartbeat) > CLIENT_TIMEOUT {
warn!("Websocket Client heartbeat failed, disconnecting!");
ctx.stop();
return;
}
ctx.ping(b"");
});
}
}
impl Actor for WebSocket {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
if let Some(subscriber) = self.invoice_subscriber.take() {
<WebSocket as StreamHandler<Invoice>>::add_stream(InvoiceStream(subscriber), ctx);
}
Self::heartbeat(ctx);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Pong(_)) => {
self.last_heartbeat = Instant::now();
}
Ok(ws::Message::Close(reason)) => {
match &reason {
Some(r) => debug!("Websocket client closing: {:#?}", r.description),
None => debug!("Websocket client closing"),
}
ctx.close(reason);
ctx.stop();
}
Ok(m) => debug!("Received unexpected message from websocket client: {:?}", m),
Err(e) => warn!("Received error from websocket client: {:?}", e),
}
}
}
impl StreamHandler<Invoice> for WebSocket {
fn handle(&mut self, invoice_update: Invoice, ctx: &mut Self::Context) {
ctx.text(ByteString::from(
json!(
{
"address": invoice_update.address(),
"amount_paid": invoice_update.amount_paid(),
"amount_requested": invoice_update.amount_requested(),
"uri": invoice_update.uri(),
"confirmations": invoice_update.confirmations(),
"confirmations_required": invoice_update.confirmations_required(),
"expiration_in": invoice_update.expiration_in(),
}
)
.to_string(),
));
if invoice_update.is_confirmed() {
ctx.close(Some(ws::CloseReason::from((
ws::CloseCode::Normal,
"Invoice Complete",
))));
ctx.stop();
} else if invoice_update.is_expired() {
ctx.close(Some(ws::CloseReason::from((
ws::CloseCode::Normal,
"Invoice Expired",
))));
ctx.stop();
}
}
}
struct InvoiceStream(Subscriber);
impl Stream for InvoiceStream {
type Item = Invoice;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll(cx)
}
}