use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use crate::ops::OperationId;
use crate::ops_lifecycle::{OperationKind, OperationTerminalOutcome};
pub type CompletionSeq = u64;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CompletionEntry {
pub seq: CompletionSeq,
pub operation_id: OperationId,
pub kind: OperationKind,
pub display_name: String,
pub terminal_outcome: OperationTerminalOutcome,
pub completed_at_ms: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct CompletionBatch {
pub entries: Vec<CompletionEntry>,
pub watermark: CompletionSeq,
}
pub trait CompletionFeed: Send + Sync + std::fmt::Debug {
fn watermark(&self) -> CompletionSeq;
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch;
fn wait_for_advance(
&self,
after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>>;
}
#[derive(Debug, Clone)]
pub struct CompletionEnrichmentData {
pub job_id: String,
pub detail: String,
}
pub trait CompletionEnrichmentProvider: Send + Sync {
fn enrich(&self, operation_id: &OperationId) -> Option<CompletionEnrichmentData>;
}
impl<T: CompletionFeed + ?Sized> CompletionFeed for Arc<T> {
fn watermark(&self) -> CompletionSeq {
(**self).watermark()
}
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
(**self).list_since(after_seq)
}
fn wait_for_advance(
&self,
after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
(**self).wait_for_advance(after_seq)
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, dead_code)]
pub(crate) mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug)]
pub struct MockCompletionFeed {
watermark: AtomicU64,
entries: std::sync::Mutex<Vec<CompletionEntry>>,
}
impl MockCompletionFeed {
pub fn new() -> Self {
Self {
watermark: AtomicU64::new(0),
entries: std::sync::Mutex::new(Vec::new()),
}
}
pub fn with_watermark(watermark: CompletionSeq) -> Self {
Self {
watermark: AtomicU64::new(watermark),
entries: std::sync::Mutex::new(Vec::new()),
}
}
pub fn push(&self, entry: CompletionEntry) {
let mut entries = self.entries.lock().unwrap();
self.watermark.store(entry.seq, Ordering::Release);
entries.push(entry);
}
}
impl CompletionFeed for MockCompletionFeed {
fn watermark(&self) -> CompletionSeq {
self.watermark.load(Ordering::Acquire)
}
fn list_since(&self, after_seq: CompletionSeq) -> CompletionBatch {
let entries = self.entries.lock().unwrap();
CompletionBatch {
entries: entries
.iter()
.filter(|e| e.seq > after_seq)
.cloned()
.collect(),
watermark: self.watermark.load(Ordering::Acquire),
}
}
fn wait_for_advance(
&self,
_after_seq: CompletionSeq,
) -> Pin<Box<dyn Future<Output = CompletionSeq> + Send + '_>> {
Box::pin(async move { self.watermark.load(Ordering::Acquire) })
}
}
#[test]
fn mock_feed_with_watermark_seeds_at_expected_value() {
let feed = MockCompletionFeed::with_watermark(42);
assert_eq!(feed.watermark(), 42);
assert!(feed.list_since(0).entries.is_empty());
}
}