surrealcs 0.4.4

The SurrealCS client code for SurrealDB
Documentation
//! Defines the caching actor for stashing pending transactions
use super::messages::CacheMessage;
use std::collections::LinkedList;
use tokio::sync::mpsc;

/// The caching actor that stashes pending transactions which can be flushed when the connection is re-established.
///
/// # Arguments
/// * `rx`: The receiver for the caching actor
pub async fn caching_actor<T: Clone>(mut rx: mpsc::Receiver<CacheMessage<T>>) {
	let mut cache: LinkedList<T> = LinkedList::new();

	loop {
		let message = match rx.recv().await {
			Some(message) => message,
			None => continue,
		};
		match message {
			CacheMessage::Put(message) => {
				cache.push_back(message);
				// cache the message
			}
			CacheMessage::Flush(sender) => {
				// flush the cache
				for message in cache.into_iter() {
					match sender.send(message).await {
						Ok(_) => {}
						Err(e) => {
							tracing::error!("error when sending message from cache: {}", e);
						}
					}
				}
				break;
			} // TODO => maybe add a kill message
		}
	}
}

#[cfg(test)]
mod tests {

	use super::*;

	#[tokio::test]
	async fn test_caching_actor() {
		let (tx, rx) = mpsc::channel::<CacheMessage<i32>>(5);
		let (tx2, mut rx2) = mpsc::channel::<i32>(5);

		let caching_handle = tokio::spawn(async move { caching_actor(rx).await });

		let message = CacheMessage::Put(1);
		tx.send(message).await.unwrap();
		let message = CacheMessage::Put(2);
		tx.send(message).await.unwrap();
		let message = CacheMessage::Put(3);
		tx.send(message).await.unwrap();
		let message = CacheMessage::Put(4);
		tx.send(message).await.unwrap();
		let message = CacheMessage::Put(5);
		tx.send(message).await.unwrap();
		let message = CacheMessage::Flush(tx2);
		tx.send(message).await.unwrap();

		let message = rx2.recv().await.unwrap();
		assert_eq!(message, 1);
		let message = rx2.recv().await.unwrap();
		assert_eq!(message, 2);
		let message = rx2.recv().await.unwrap();
		assert_eq!(message, 3);
		let message = rx2.recv().await.unwrap();
		assert_eq!(message, 4);
		let message = rx2.recv().await.unwrap();
		assert_eq!(message, 5);

		caching_handle.await.unwrap();
	}
}