#![cfg(feature = "async")]
use crate::colony::{Colony, ColonyEvent, ColonyStats, ColonySnapshot};
use phago_core::types::{DocumentId, Position};
use std::cell::RefCell;
use std::rc::Rc;
use tokio::task::JoinHandle;
pub struct AsyncColony {
colony: Rc<RefCell<Colony>>,
}
impl AsyncColony {
pub fn new(colony: Colony) -> Self {
Self {
colony: Rc::new(RefCell::new(colony)),
}
}
pub fn inner(&self) -> Rc<RefCell<Colony>> {
Rc::clone(&self.colony)
}
pub fn into_inner(self) -> Colony {
match Rc::try_unwrap(self.colony) {
Ok(cell) => cell.into_inner(),
Err(_) => panic!("Cannot unwrap AsyncColony: other references exist"),
}
}
pub async fn ingest_document(
&self,
title: &str,
content: &str,
position: Position,
) -> DocumentId {
let id = self.colony.borrow_mut().ingest_document(title, content, position);
tokio::task::yield_now().await;
id
}
pub async fn ingest_documents_parallel(
&self,
documents: &[(&str, &str, Position)],
) -> Vec<DocumentId> {
let mut ids = Vec::with_capacity(documents.len());
for (title, content, position) in documents {
let id = self.colony.borrow_mut().ingest_document(title, content, *position);
ids.push(id);
tokio::task::yield_now().await;
}
ids
}
pub async fn tick_async(&self) -> Vec<ColonyEvent> {
let events = self.colony.borrow_mut().tick();
tokio::task::yield_now().await;
events
}
pub async fn run_async(&self, ticks: u64) -> Vec<Vec<ColonyEvent>> {
let mut all_events = Vec::with_capacity(ticks as usize);
for _ in 0..ticks {
let events = self.colony.borrow_mut().tick();
all_events.push(events);
tokio::task::yield_now().await;
}
all_events
}
pub async fn run_with_callback<F>(
&self,
ticks: u64,
mut callback: F,
) -> Vec<Vec<ColonyEvent>>
where
F: FnMut(u64, &[ColonyEvent]) -> bool,
{
let mut all_events = Vec::with_capacity(ticks as usize);
for tick in 0..ticks {
let events = self.colony.borrow_mut().tick();
let should_continue = callback(tick, &events);
all_events.push(events);
if !should_continue {
break;
}
tokio::task::yield_now().await;
}
all_events
}
pub fn stats(&self) -> ColonyStats {
self.colony.borrow().stats()
}
pub fn snapshot(&self) -> ColonySnapshot {
self.colony.borrow().snapshot()
}
pub fn alive_count(&self) -> usize {
self.colony.borrow().alive_count()
}
}
pub fn spawn_simulation_local(
colony: Rc<RefCell<Colony>>,
ticks: u64,
) -> JoinHandle<Vec<Vec<ColonyEvent>>> {
tokio::task::spawn_local(async move {
let mut all_events = Vec::with_capacity(ticks as usize);
for _ in 0..ticks {
let events = colony.borrow_mut().tick();
all_events.push(events);
tokio::task::yield_now().await;
}
all_events
})
}
pub async fn batch_ingest(
colony: Rc<RefCell<Colony>>,
documents: Vec<(String, String, Position)>,
batch_size: usize,
) -> Vec<DocumentId> {
let mut ids = Vec::with_capacity(documents.len());
for batch in documents.chunks(batch_size) {
for (title, content, position) in batch {
let id = colony.borrow_mut().ingest_document(title, content, *position);
ids.push(id);
}
tokio::task::yield_now().await;
}
ids
}
pub struct TickTimer {
interval: tokio::time::Interval,
}
impl TickTimer {
pub fn new(interval_ms: u64) -> Self {
Self {
interval: tokio::time::interval(tokio::time::Duration::from_millis(interval_ms)),
}
}
pub async fn tick(&mut self) {
self.interval.tick().await;
}
pub async fn run_timed(
&mut self,
colony: &AsyncColony,
ticks: u64,
) -> Vec<Vec<ColonyEvent>> {
let mut all_events = Vec::with_capacity(ticks as usize);
for _ in 0..ticks {
self.tick().await;
let events = colony.tick_async().await;
all_events.push(events);
}
all_events
}
}
pub async fn run_in_local<F, Fut, T>(colony: Colony, f: F) -> T
where
F: FnOnce(AsyncColony) -> Fut,
Fut: std::future::Future<Output = T>,
{
let local = tokio::task::LocalSet::new();
local.run_until(async move {
let async_colony = AsyncColony::new(colony);
f(async_colony).await
}).await
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::task::LocalSet;
async fn run_test<F, Fut>(f: F)
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = ()>,
{
let local = LocalSet::new();
local.run_until(f()).await;
}
#[tokio::test]
async fn async_colony_basic_operations() {
run_test(|| async {
let colony = Colony::new();
let async_colony = AsyncColony::new(colony);
let _doc_id = async_colony
.ingest_document("Test", "Cell membrane protein", Position::new(0.0, 0.0))
.await;
let stats = async_colony.stats();
assert_eq!(stats.documents_total, 1);
let events = async_colony.run_async(5).await;
assert_eq!(events.len(), 5);
}).await;
}
#[tokio::test]
async fn async_colony_parallel_documents() {
run_test(|| async {
let colony = Colony::new();
let async_colony = AsyncColony::new(colony);
let docs = vec![
("Doc 1", "Content about cells", Position::new(0.0, 0.0)),
("Doc 2", "Content about proteins", Position::new(5.0, 0.0)),
("Doc 3", "Content about membranes", Position::new(10.0, 0.0)),
];
let ids = async_colony.ingest_documents_parallel(&docs).await;
assert_eq!(ids.len(), 3);
let stats = async_colony.stats();
assert_eq!(stats.documents_total, 3);
}).await;
}
#[tokio::test]
async fn async_colony_with_callback() {
run_test(|| async {
let mut colony = Colony::new();
colony.ingest_document("Test", "Cell membrane", Position::new(0.0, 0.0));
let async_colony = AsyncColony::new(colony);
let mut tick_count = 0;
let events = async_colony
.run_with_callback(10, |tick, _events| {
tick_count = tick + 1;
tick < 5 })
.await;
assert_eq!(events.len(), 6); assert_eq!(tick_count, 6);
}).await;
}
#[tokio::test]
async fn spawn_simulation_local_works() {
let local = LocalSet::new();
local.run_until(async {
let mut colony = Colony::new();
colony.ingest_document("Test", "Content", Position::new(0.0, 0.0));
let rc = Rc::new(RefCell::new(colony));
let handle = spawn_simulation_local(Rc::clone(&rc), 10);
let events = handle.await.unwrap();
assert_eq!(events.len(), 10);
}).await;
}
#[tokio::test]
async fn batch_ingest_works() {
run_test(|| async {
let colony = Colony::new();
let rc = Rc::new(RefCell::new(colony));
let docs: Vec<_> = (0..10)
.map(|i| (format!("Doc {}", i), format!("Content {}", i), Position::new(i as f64, 0.0)))
.collect();
let ids = batch_ingest(Rc::clone(&rc), docs, 3).await;
assert_eq!(ids.len(), 10);
let stats = rc.borrow().stats();
assert_eq!(stats.documents_total, 10);
}).await;
}
#[tokio::test]
async fn tick_timer_controlled_rate() {
run_test(|| async {
let colony = Colony::new();
let async_colony = AsyncColony::new(colony);
let mut timer = TickTimer::new(10); let start = tokio::time::Instant::now();
let events = timer.run_timed(&async_colony, 5).await;
let elapsed = start.elapsed();
assert_eq!(events.len(), 5);
assert!(elapsed.as_millis() >= 40);
}).await;
}
#[tokio::test]
async fn run_in_local_convenience() {
let colony = Colony::new();
let events = run_in_local(colony, |async_colony| async move {
async_colony.run_async(5).await
}).await;
assert_eq!(events.len(), 5);
}
}