use std::sync::Arc;
use actix::{Arbiter, System};
use eyre::{OptionExt, WrapErr};
use futures_util::{stream, StreamExt};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub struct ArbiterPool {
stream: Arc<
tokio::sync::Mutex<
std::pin::Pin<Box<dyn futures_util::Stream<Item = actix::ArbiterHandle> + Send>>,
>,
>,
pub system_handle: JoinHandle<eyre::Result<()>>,
}
impl ArbiterPool {
pub async fn new() -> eyre::Result<Self> {
let (tx, mut rx) = mpsc::channel(1);
let system_handle = tokio::task::spawn_blocking(move || {
let system = System::new();
let _ignored = system.runtime().spawn({
let task = async move {
let mut arb = Arbiter::current();
loop {
tx.send(Some(arb)).await?;
tx.send(None).await?;
tx.send(None).await?;
arb = Arbiter::new().handle();
}
};
async {
let _ignored: eyre::Result<()> = task.await;
System::current().stop();
}
});
system
.run()
.wrap_err("the actix subsystem ran into an error")
});
let stream = Box::pin(stream::poll_fn(move |cx| rx.poll_recv(cx)).filter_map(async |t| t));
Ok(Self {
stream: Arc::new(tokio::sync::Mutex::new(stream)),
system_handle,
})
}
pub async fn get(&mut self) -> eyre::Result<actix::ArbiterHandle> {
let mut stream = self.stream.lock().await;
stream.next().await.ok_or_eyre("failed to get arbiter")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_arbiter_pool_creation() {
let pool = ArbiterPool::new().await;
assert!(pool.is_ok(), "Failed to create arbiter pool");
}
#[tokio::test]
async fn test_get_multiple_arbiters() {
let mut pool = ArbiterPool::new().await.unwrap();
let arb1 = pool.get().await;
let arb2 = pool.get().await;
assert!(arb1.is_ok(), "Failed to get first arbiter");
assert!(arb2.is_ok(), "Failed to get second arbiter");
}
}