use super::messages::CacheMessage;
use std::collections::LinkedList;
use tokio::sync::mpsc;
pub async fn caching_actor<T: Clone>(mut rx: mpsc::Receiver<CacheMessage<T>>) {
let mut cache: LinkedList<T> = LinkedList::new();
loop {
let message = match rx.recv().await {
Some(message) => message,
None => continue,
};
match message {
CacheMessage::Put(message) => {
cache.push_back(message);
}
CacheMessage::Flush(sender) => {
for message in cache.into_iter() {
match sender.send(message).await {
Ok(_) => {}
Err(e) => {
tracing::error!("error when sending message from cache: {}", e);
}
}
}
break;
} }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_caching_actor() {
let (tx, rx) = mpsc::channel::<CacheMessage<i32>>(5);
let (tx2, mut rx2) = mpsc::channel::<i32>(5);
let caching_handle = tokio::spawn(async move { caching_actor(rx).await });
let message = CacheMessage::Put(1);
tx.send(message).await.unwrap();
let message = CacheMessage::Put(2);
tx.send(message).await.unwrap();
let message = CacheMessage::Put(3);
tx.send(message).await.unwrap();
let message = CacheMessage::Put(4);
tx.send(message).await.unwrap();
let message = CacheMessage::Put(5);
tx.send(message).await.unwrap();
let message = CacheMessage::Flush(tx2);
tx.send(message).await.unwrap();
let message = rx2.recv().await.unwrap();
assert_eq!(message, 1);
let message = rx2.recv().await.unwrap();
assert_eq!(message, 2);
let message = rx2.recv().await.unwrap();
assert_eq!(message, 3);
let message = rx2.recv().await.unwrap();
assert_eq!(message, 4);
let message = rx2.recv().await.unwrap();
assert_eq!(message, 5);
caching_handle.await.unwrap();
}
}