use std::sync::Arc;
use tokio::task::JoinHandle;
use crate::{Bot, JoinDelay, Swarm, bot::handlers::Handlers};
pub struct Cluster {
swarms: Vec<Arc<Swarm>>,
handles: Vec<JoinHandle<()>>,
shared_handlers: Arc<Handlers>,
}
impl Cluster {
pub fn create() -> Self {
Self {
swarms: Vec::new(),
handles: Vec::new(),
shared_handlers: Arc::new(Handlers::new()),
}
}
pub fn create_with_capacity(capacity: usize) -> Self {
Self {
swarms: Vec::with_capacity(capacity),
handles: Vec::with_capacity(capacity),
shared_handlers: Arc::new(Handlers::new()),
}
}
pub fn with_handlers(mut self, handlers: Handlers) -> Self {
self.shared_handlers = Arc::new(handlers);
self
}
pub fn add_swarm(&mut self, bots: Vec<Bot>, join_delay: JoinDelay, target_host: impl Into<String>, target_port: u16) {
let mut swarm = Swarm::create()
.with_join_delay(join_delay)
.with_shared_handlers(Arc::clone(&self.shared_handlers))
.bind(target_host, target_port);
for bot in bots {
swarm.add_bot(bot);
}
self.swarms.push(Arc::new(swarm));
}
pub fn with_swarm(mut self, bots: Vec<Bot>, join_delay: JoinDelay, target_host: impl Into<String>, target_port: u16) -> Self {
let mut swarm = Swarm::create()
.with_join_delay(join_delay)
.with_shared_handlers(Arc::clone(&self.shared_handlers))
.bind(target_host, target_port);
for bot in bots {
swarm.add_bot(bot);
}
self.swarms.push(Arc::new(swarm));
self
}
pub fn get_swarm(&self, swarm_id: usize) -> Option<Arc<Swarm>> {
if let Some(swarm) = self.swarms.get(swarm_id) { Some(Arc::clone(swarm)) } else { None }
}
pub fn get_all_swarms(&self) -> Vec<Arc<Swarm>> {
let mut swarms = Vec::with_capacity(self.swarms.len());
for swarm in &self.swarms {
swarms.push(Arc::clone(swarm));
}
swarms
}
pub fn launch(&mut self) {
for swarm in &self.swarms {
self.handles.push(swarm.quiet_launch());
}
}
pub async fn launch_and_wait(&mut self) -> std::io::Result<()> {
for swarm in &self.swarms {
self.handles.push(swarm.quiet_launch());
}
self.wait_finish().await
}
pub fn launch_swarm(&mut self, swarm_id: i32) {
for (id, swarm) in self.swarms.iter().enumerate() {
if id as i32 != swarm_id {
continue;
}
self.handles.push(swarm.quiet_launch());
}
}
pub async fn for_each_consistent<F, O>(&self, f: F) -> std::io::Result<()>
where
F: Fn(Arc<Swarm>) -> O + Send + Sync + 'static,
O: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
for swarm in &self.swarms {
f(Arc::clone(swarm)).await?;
}
Ok(())
}
pub fn for_each_parallel<F, O>(&self, f: F)
where
F: Fn(Arc<Swarm>) -> O + Send + Sync + 'static,
O: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
self.swarms.iter().for_each(|swarm| {
tokio::spawn(f(Arc::clone(&swarm)));
});
}
pub async fn for_each_bots_consistent<F, O>(&self, f: F) -> std::io::Result<()>
where
F: Fn(Arc<Bot>) -> O + Send + Sync + 'static,
O: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
let f = Arc::new(f);
for swarm in &self.swarms {
for bot in &swarm.bots {
f(Arc::clone(bot)).await?;
}
}
Ok(())
}
pub fn for_each_bots_parallel<F, O>(&self, f: F)
where
F: Fn(Arc<Bot>) -> O + Send + Sync + 'static,
O: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
self.swarms.iter().for_each(|swarm| {
swarm.bots.iter().for_each(|bot| {
tokio::spawn(f(Arc::clone(&bot)));
});
});
}
pub async fn wait_finish(&mut self) -> std::io::Result<()> {
for handle in &mut self.handles {
handle.await?;
}
Ok(())
}
pub async fn shutdown(&mut self) {
self.abort_handles();
self.swarms.clear();
self.handles.clear();
}
pub fn abort_handles(&self) {
for handle in &self.handles {
if !handle.is_finished() {
handle.abort();
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::{
Bot, Cluster, JoinDelay,
bot::{BotChatExt, handlers::Handlers},
};
#[tokio::test]
async fn test_minimal_cluster() -> std::io::Result<()> {
let mut cluster = Cluster::create();
for si in 0..3 {
let mut bots = Vec::new();
for bi in 0..2 {
bots.push(Bot::create(format!("nurtex_{}_{}", si, bi)));
}
cluster.add_swarm(bots, JoinDelay::fixed(5000), "localhost", 25565);
}
cluster.launch();
tokio::time::sleep(Duration::from_secs(5)).await;
cluster.wait_finish().await
}
#[tokio::test]
async fn test_for_each_bots() -> std::io::Result<()> {
let mut cluster = Cluster::create();
for si in 0..3 {
let mut bots = Vec::new();
for bi in 0..2 {
bots.push(Bot::create(format!("nurtex_{}_{}", si, bi)));
}
cluster.add_swarm(bots, JoinDelay::fixed(5000), "localhost", 25565);
}
cluster.launch();
tokio::time::sleep(Duration::from_secs(6)).await;
cluster.for_each_bots_parallel(async |bot| bot.chat_message("Параллельный for-each").await);
tokio::time::sleep(Duration::from_secs(2)).await;
cluster
.for_each_bots_consistent(async |bot| {
tokio::time::sleep(Duration::from_millis(250)).await;
bot.chat_message("Последовательный for-each").await
})
.await?;
cluster.wait_finish().await
}
#[tokio::test]
async fn test_shared_handlers() -> std::io::Result<()> {
let mut handlers = Handlers::new();
handlers.on_login(async |username| {
println!("Бот {} залогинился", username);
Ok(())
});
handlers.on_spawn(async |username| {
println!("Бот {} заспавнился", username);
Ok(())
});
handlers.on_chat(async |username, payload| {
println!("Бот {} получил сообщение: {}", username, payload.message);
Ok(())
});
handlers.on_disconnect(async |username, payload| {
println!("Бот {} отключился в состоянии: {:?}", username, payload.state);
Ok(())
});
let mut cluster = Cluster::create().with_handlers(handlers);
for si in 0..3 {
let mut bots = Vec::new();
for bi in 0..10 {
bots.push(Bot::create(format!("nurtex_{}_{}", si, bi)));
}
cluster.add_swarm(bots, JoinDelay::fixed(5000), "localhost", 25565);
}
cluster.launch_and_wait().await
}
}