use std::error::Error;
use futures_core::Stream;
use poem::listener::TcpListener;
use poem::middleware::AddData;
use poem::web::Data;
use poem::{get, handler, post, Body, EndpointExt, Request, Response, Route, Server};
use tokio::sync::mpsc::{self, Sender};
use tracing::{error, info, warn};
use crate::{CallbackResponse, MomoUpdates};
#[derive(Debug, Clone)]
pub struct CallbackServerConfig {
pub http_port: u16,
pub host: String,
}
impl Default for CallbackServerConfig {
fn default() -> Self {
Self {
http_port: 8500,
host: "127.0.0.1".to_string(),
}
}
}
#[handler]
async fn health_check() -> &'static str {
"OK"
}
#[handler]
async fn mtn_callback_handler(
req: &Request,
mut body: Body,
sender: Data<&Sender<MomoUpdates>>,
) -> Result<Response, poem::Error> {
let remote_address = req.remote_addr().to_string();
let body_string = body.into_string().await?;
info!("Received callback from {}", remote_address);
info!("Raw callback body: {}", body_string);
let response_result: Result<CallbackResponse, serde_json::Error> =
serde_json::from_str(&body_string);
match response_result {
Ok(callback_response) => {
let momo_updates = MomoUpdates {
remote_address,
response: callback_response,
};
if let Err(e) = sender.send(momo_updates).await {
error!("Failed to send callback update: {}", e);
} else {
info!("Successfully processed callback");
}
}
Err(e) => {
warn!(
"Failed to parse callback body: {} - Body: {}",
e, body_string
);
}
}
Ok(Response::builder()
.status(poem::http::StatusCode::OK)
.header("Content-Type", "application/json")
.body(r#"{"status": "success", "message": "Callback received successfully"}"#))
}
fn create_callback_routes() -> Route {
Route::new()
.at(
"/collection_request_to_pay",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/collection_request_to_withdraw_v1",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/collection_request_to_withdraw_v2",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/collection_invoice",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/collection_payment",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/collection_preapproval",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/disbursement_deposit_v1",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/disbursement_deposit_v2",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/disbursement_refund_v1",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/disbursement_refund_v2",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/disbursement_transfer",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/remittance_cash_transfer",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at(
"/remittance_transfer",
post(mtn_callback_handler).put(mtn_callback_handler),
)
.at("/health", get(health_check))
}
pub async fn start_callback_server(
config: CallbackServerConfig,
) -> Result<impl Stream<Item = MomoUpdates>, Box<dyn Error>> {
info!("Starting MTN MoMo Callback Server");
info!("Host: {}, Port: {}", config.host, config.http_port);
let (tx, mut rx) = mpsc::channel::<MomoUpdates>(100);
let app = create_callback_routes()
.with(poem::middleware::Tracing)
.with(poem::middleware::Cors::new())
.with(poem::middleware::Compression::default())
.with(poem::middleware::RequestId::default())
.with(AddData::new(tx.clone()));
let bind_address = format!("{}:{}", config.host, config.http_port);
info!("Binding server to address: {}", bind_address);
tokio::spawn(async move {
let listener = TcpListener::bind(&bind_address);
match Server::new(listener)
.run_with_graceful_shutdown(
app,
async {
tokio::signal::ctrl_c().await.ok();
info!("Received shutdown signal, stopping server...");
},
None,
)
.await
{
Ok(_) => info!("Server stopped successfully"),
Err(e) => error!("Server error: {}", e),
}
});
info!("MTN MoMo Callback Server started successfully");
Ok(async_stream::stream! {
while let Some(msg) = rx.recv().await {
yield msg;
}
})
}