use crate::proto::obscura::v1 as proto;
use crate::services::key_service::KeyService;
use axum::extract::ws::Message as WsMessage;
use prost::Message as ProstMessage;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::Instrument;
use uuid::Uuid;
pub struct PreKeyPump {
notify_tx: mpsc::Sender<()>,
}
impl PreKeyPump {
pub fn new(
device_id: Uuid,
key_service: KeyService,
outbound_tx: mpsc::Sender<WsMessage>,
debounce_interval_ms: u64,
) -> Self {
let (notify_tx, notify_rx) = mpsc::channel(1);
tokio::spawn(
async move {
Self::run_background(device_id, notify_rx, key_service, outbound_tx, debounce_interval_ms).await;
}
.instrument(tracing::info_span!("prekey_pump", "device.id" = %device_id)),
);
Self { notify_tx }
}
pub fn notify(&self) {
let _ = self.notify_tx.try_send(());
}
async fn run_background(
device_id: Uuid,
mut rx: mpsc::Receiver<()>,
key_service: KeyService,
outbound_tx: mpsc::Sender<WsMessage>,
debounce_interval_ms: u64,
) {
while rx.recv().await.is_some() {
tokio::time::sleep(Duration::from_millis(debounce_interval_ms)).await;
while rx.try_recv().is_ok() {}
match key_service.check_pre_key_status(device_id).await {
Ok(Some(status)) => {
let frame = proto::WebSocketFrame {
payload: Some(proto::web_socket_frame::Payload::PreKeyStatus(proto::PreKeyStatus {
one_time_pre_key_count: status.one_time_pre_key_count,
min_threshold: status.min_threshold,
})),
};
let mut buf = Vec::new();
if frame.encode(&mut buf).is_ok() {
if outbound_tx.send(WsMessage::Binary(buf.into())).await.is_err() {
break;
}
}
}
Ok(None) => {
tracing::debug!("PreKeyLow event coalesced, but user is no longer low on keys");
}
Err(e) => {
tracing::error!(error = %e, "Failed to check pre-key status for coalesced frame");
}
}
}
}
}