use tokio::sync::mpsc;
use crate::context::Issue;
const EVENT_BUFFER: usize = 256;
#[derive(Clone)]
pub(super) struct EventProducer {
sender: mpsc::Sender<OrchestratorEvent>,
}
impl EventProducer {
pub(super) async fn intake_issue(&self, issue: Issue) {
self.send(OrchestratorEvent::Intake(IntakeEvent::Issue(issue))).await;
}
pub(super) async fn intake_failed(&self, error: impl ToString) {
self
.send(OrchestratorEvent::Intake(IntakeEvent::Failed(error.to_string())))
.await;
}
pub(super) async fn intake_stopped(&self) {
self.send(OrchestratorEvent::Intake(IntakeEvent::Stopped)).await;
}
async fn send(&self, event: OrchestratorEvent) {
if self.sender.send(event).await.is_err() {
tracing::debug!("orchestrator event receiver dropped");
}
}
}
pub(super) struct EventConsumer {
receiver: mpsc::Receiver<OrchestratorEvent>,
}
impl EventConsumer {
pub(super) async fn recv(&mut self) -> Option<OrchestratorEvent> {
self.receiver.recv().await
}
}
pub(super) fn event_channel() -> (EventProducer, EventConsumer) {
let (sender, receiver) = mpsc::channel(EVENT_BUFFER);
(EventProducer { sender }, EventConsumer { receiver })
}
#[allow(clippy::large_enum_variant)]
pub(super) enum OrchestratorEvent {
Intake(IntakeEvent),
}
pub(super) enum IntakeEvent {
Issue(Issue),
Failed(String),
Stopped,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn producer_delivers_intake_events_in_order() {
let (producer, mut consumer) = event_channel();
producer.intake_failed("pull failed").await;
producer.intake_stopped().await;
match consumer.recv().await.expect("first event") {
OrchestratorEvent::Intake(IntakeEvent::Failed(error)) => assert_eq!(error, "pull failed"),
_ => panic!("expected intake failure"),
}
match consumer.recv().await.expect("second event") {
OrchestratorEvent::Intake(IntakeEvent::Stopped) => {},
_ => panic!("expected intake stopped"),
}
}
}