use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::sync::Mutex;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{Error, Message, input::Input};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryInputConfig {
pub messages: Option<Vec<String>>,
}
pub struct MemoryInput {
queue: Arc<Mutex<VecDeque<Message>>>,
connected: AtomicBool,
}
impl MemoryInput {
pub fn new(config: &MemoryInputConfig) -> Result<Self, Error> {
let mut queue = VecDeque::new();
if let Some(messages) = &config.messages {
for msg_str in messages {
queue.push_back(Message::from_string(msg_str));
}
}
Ok(Self {
queue: Arc::new(Mutex::new(queue)),
connected: AtomicBool::new(false),
})
}
pub async fn push(&self, msg: Message) -> Result<(), Error> {
let mut queue = self.queue.lock().await;
queue.push_back(msg);
Ok(())
}
}
#[async_trait]
impl Input for MemoryInput {
async fn connect(&self) -> Result<(), Error> {
self.connected.store(true, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
async fn read(&self) -> Result<Message, Error> {
if !self.connected.load(std::sync::atomic::Ordering::SeqCst) {
return Err(Error::Connection("输入未连接".to_string()));
}
let msg_option;
{
let mut queue = self.queue.lock().await;
msg_option = queue.pop_front();
}
if let Some(msg) = msg_option {
Ok(msg)
} else {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Err(Error::Done)
}
}
async fn acknowledge(&self, _msg: &Message) -> Result<(), Error> {
Ok(())
}
async fn close(&self) -> Result<(), Error> {
self.connected.store(false, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
}