use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast::error::RecvError;
use tokio_stream::Stream;
use tonic::{Request, Response, Status, Streaming};
use aa_proto::assembly::gateway::v1::invalidation_service_server::InvalidationService;
use aa_proto::assembly::gateway::v1::{subscribe_request::Kind, InvalidationEvent, SubscribeRequest};
use super::InvalidationHub;
#[derive(Clone)]
pub struct InvalidationServiceImpl {
hub: Arc<InvalidationHub>,
}
impl InvalidationServiceImpl {
pub fn new(hub: Arc<InvalidationHub>) -> Self {
Self { hub }
}
}
#[tonic::async_trait]
impl InvalidationService for InvalidationServiceImpl {
type SubscribeStream = Pin<Box<dyn Stream<Item = Result<InvalidationEvent, Status>> + Send + 'static>>;
async fn subscribe(
&self,
request: Request<Streaming<SubscribeRequest>>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let mut inbound = request.into_inner();
let Some(first) = inbound.message().await? else {
return Err(Status::invalid_argument(
"stream closed before initial SubscribeRequest",
));
};
if first.assembly_id.is_empty() {
return Err(Status::invalid_argument("assembly_id is required"));
}
let assembly_id = first.assembly_id;
let last_seq_seen = match first.kind {
Some(Kind::Initial(initial)) => initial.last_seq_seen,
_ => 0,
};
let handle = self.hub.subscribe(assembly_id.clone(), last_seq_seen);
let hub = Arc::clone(&self.hub);
tokio::spawn(async move {
while let Ok(Some(message)) = inbound.message().await {
if let Some(Kind::Ack(ack)) = message.kind {
hub.ack(&assembly_id, ack.seq);
}
}
});
let super::SubscriptionHandle { replay, mut receiver } = handle;
let stream = async_stream::try_stream! {
for event in replay {
yield event;
}
loop {
match receiver.recv().await {
Ok(event) => yield event,
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
}
};
Ok(Response::new(Box::pin(stream)))
}
}