use std::sync::Arc;
use std::time::Duration;
use nurtex_registry::BlockKind;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::bot::Bot;
use crate::bot::handlers::Handlers;
use crate::protocol::types::BlockPos;
use crate::storage::Storage;
use crate::swarm::{JoinDelay, TargetServer};
use crate::world::Entity;
#[cfg(feature = "random")]
use crate::random::generate_username;
pub struct Swarm {
pub bots: Vec<Arc<Bot>>,
target_server: Arc<RwLock<TargetServer>>,
join_delay: Arc<JoinDelay>,
handles: Vec<JoinHandle<core::result::Result<(), std::io::Error>>>,
shared_storage: Arc<Storage>,
shared_handlers: Arc<Handlers>,
}
impl Swarm {
pub fn create() -> Self {
Self {
bots: Vec::new(),
target_server: Arc::new(RwLock::new(TargetServer::default())),
join_delay: Arc::new(JoinDelay::fixed(1000)),
handles: Vec::new(),
shared_storage: Arc::new(Storage::null()),
shared_handlers: Arc::new(Handlers::new()),
}
}
#[cfg(feature = "random")]
pub fn create_random(bots_count: usize) -> Self {
use rand::Rng;
let mut swarm = Self::create();
for _ in 0..bots_count {
let random_username = loop {
let username = generate_username(rand::thread_rng().gen_range(5..=14));
if swarm.username_is_unique(&username) {
break username;
}
};
swarm.add_bot(Bot::create(random_username));
}
swarm
}
pub fn create_with_capacity(capacity: usize) -> Self {
Self {
bots: Vec::with_capacity(capacity),
target_server: Arc::new(RwLock::new(TargetServer::default())),
join_delay: Arc::new(JoinDelay::fixed(1000)),
handles: Vec::with_capacity(capacity),
shared_storage: Arc::new(Storage::null()),
shared_handlers: Arc::new(Handlers::new()),
}
}
pub fn with_join_delay(mut self, join_delay: JoinDelay) -> Self {
self.join_delay = Arc::new(join_delay);
self
}
pub fn with_handlers(mut self, handlers: Handlers) -> Self {
self.shared_handlers = Arc::new(handlers);
self
}
pub fn with_shared_handlers(mut self, handlers: Arc<Handlers>) -> Self {
self.shared_handlers = handlers;
self
}
pub fn bind(self, server_host: impl Into<String>, server_port: u16) -> Self {
match self.target_server.try_write() {
Ok(mut guard) => {
guard.host = server_host.into();
guard.port = server_port;
}
Err(_) => {}
};
self
}
pub async fn rebind(&self, server_host: impl Into<String>, server_port: u16) {
let mut guard = self.target_server.write().await;
guard.host = server_host.into();
guard.port = server_port;
}
pub fn get_shared_storage(&self) -> Arc<Storage> {
Arc::clone(&self.shared_storage)
}
pub async fn for_each_consistent<F, Fut>(&self, f: F) -> std::io::Result<()>
where
F: Fn(Arc<Bot>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
for bot in &self.bots {
f(Arc::clone(bot)).await?;
}
Ok(())
}
pub fn for_each_parallel<F, Fut>(&self, f: F)
where
F: Fn(Arc<Bot>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = std::io::Result<()>> + Send + 'static,
{
self.bots.iter().for_each(|bot| {
tokio::spawn(f(Arc::clone(&bot)));
});
}
pub fn add_bot(&mut self, bot: Bot) {
let swarm_bot = bot
.set_shared_storage(Arc::clone(&self.shared_storage))
.set_shared_handlers(Arc::clone(&self.shared_handlers));
self.bots.push(Arc::new(swarm_bot));
}
pub fn add_bots(&mut self, bots: Vec<Bot>) {
for bot in bots {
let swarm_bot = bot
.set_shared_storage(Arc::clone(&self.shared_storage))
.set_shared_handlers(Arc::clone(&self.shared_handlers));
self.bots.push(Arc::new(swarm_bot));
}
}
pub fn with_bot(mut self, bot: Bot) -> Self {
let swarm_bot = bot
.set_shared_storage(Arc::clone(&self.shared_storage))
.set_shared_handlers(Arc::clone(&self.shared_handlers));
self.bots.push(Arc::new(swarm_bot));
self
}
pub fn with_bots(mut self, bots: Vec<Bot>) -> Self {
for bot in bots {
let swarm_bot = bot
.set_shared_storage(Arc::clone(&self.shared_storage))
.set_shared_handlers(Arc::clone(&self.shared_handlers));
self.bots.push(Arc::new(swarm_bot));
}
self
}
pub fn username_is_unique(&self, username: &str) -> bool {
for bot in &self.bots {
if username == bot.username() {
return false;
}
}
true
}
pub async fn launch(&mut self) {
let total_bots = self.bots.len();
let (host, port) = {
let guard = self.target_server.read().await;
(guard.host.clone(), guard.port)
};
for (index, bot) in self.bots.iter().enumerate() {
let handle = bot.connect_with_handle(&host, port);
self.handles.push(handle);
let delay = self.join_delay.activate(index, total_bots);
if index < total_bots - 1 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
pub async fn launch_and_wait(&mut self) {
self.launch().await;
self.wait_handles().await;
}
pub fn instant_launch(&mut self) {
let (host, port) = match self.target_server.try_read() {
Ok(g) => (g.host.clone(), g.port),
Err(_) => ("localhost".to_string(), 25565),
};
for bot in &self.bots {
let handle = bot.connect_with_handle(&host, port);
self.handles.push(handle);
}
}
pub fn quiet_launch(&self) -> JoinHandle<()> {
let total_bots = self.bots.len();
let join_delay = Arc::clone(&self.join_delay);
let bots = {
let mut vec = Vec::new();
for bot in &self.bots {
vec.push(Arc::clone(bot));
}
vec
};
let (host, port) = match self.target_server.try_read() {
Ok(g) => (g.host.clone(), g.port),
Err(_) => ("localhost".to_string(), 25565),
};
tokio::spawn(async move {
let mut handles = Vec::with_capacity(total_bots);
for (index, bot) in bots.iter().enumerate() {
let handle = bot.connect_with_handle(&host, port);
handles.push(handle);
let delay = join_delay.activate(index, total_bots);
if index < total_bots - 1 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
for handle in handles {
let _ = handle.await;
}
})
}
pub fn bots_count(&self) -> usize {
self.bots.len()
}
pub fn handles_count(&self) -> usize {
self.handles.len()
}
pub fn is_null(&self) -> bool {
self.bots.is_empty()
}
pub fn get_bot_usernames(&self) -> Vec<String> {
self.bots.iter().map(|bot| bot.username().to_string()).collect()
}
pub async fn shutdown(&mut self) -> std::io::Result<()> {
self.abort_handles();
tokio::time::sleep(Duration::from_millis(100)).await;
for bot in &self.bots {
bot.shutdown().await?;
}
self.handles.clear();
self.bots.clear();
self.shared_storage.clear().await;
Ok(())
}
pub fn abort_handles(&self) {
for handle in &self.handles {
handle.abort();
}
}
pub async fn wait_handles(&mut self) {
for handle in &mut self.handles {
if !handle.is_finished() {
let _ = handle.await;
}
}
}
pub async fn get_entity(&self, id: &i32) -> Option<Entity> {
self.shared_storage.get_entity(id).await
}
pub async fn get_block(&self, pos: BlockPos) -> Option<BlockKind> {
self.shared_storage.get_block(pos).await
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::bot::Bot;
use crate::bot::handlers::Handlers;
use crate::swarm::{JoinDelay, Swarm};
#[tokio::test]
async fn test_instant() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(10);
for i in 0..10 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.instant_launch();
tokio::time::sleep(Duration::from_secs(3)).await;
swarm.for_each_parallel(async |bot| {
let position = bot.get_position().await;
let rotation = bot.get_rotation().await;
println!("[{}] Позиция: {:?}, Ротация: {:?}", bot.username(), position, rotation);
Ok(())
});
tokio::time::sleep(Duration::from_secs(8)).await;
swarm.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_quiet() -> std::io::Result<()> {
let mut bots = Vec::new();
for i in 0..10 {
bots.push(Bot::create(format!("nurtex_{}", i)));
}
let mut swarm = Swarm::create_with_capacity(10)
.with_bots(bots)
.with_join_delay(JoinDelay::fixed(200))
.bind("localhost", 25565);
let handle = swarm.quiet_launch();
tokio::time::sleep(Duration::from_secs(1)).await;
handle.abort();
swarm.shutdown().await?;
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
#[tokio::test]
async fn test_wait_handles() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(6).with_join_delay(JoinDelay::fixed(200)).bind("localhost", 25565);
for i in 0..6 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.launch().await;
swarm.wait_handles().await;
Ok(())
}
#[tokio::test]
async fn test_shared_storage() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(6).with_join_delay(JoinDelay::fixed(200)).bind("localhost", 25565);
for i in 0..6 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.launch().await;
for _ in 0..5 {
let storage = swarm.get_shared_storage();
let entities = { storage.entities.read().await.clone() };
println!("Сущности: {:?}", entities);
tokio::time::sleep(Duration::from_secs(3)).await;
}
Ok(())
}
#[tokio::test]
async fn test_shared_handlers() {
let mut handlers = Handlers::new();
handlers.on_login(async |username| {
println!("Бот {} залогинился", username);
Ok(())
});
handlers.on_spawn(async |username| {
println!("Бот {} заспавнился", username);
Ok(())
});
handlers.on_chat(async |username, payload| {
println!("Бот {} получил сообщение: {}", username, payload.message);
Ok(())
});
handlers.on_disconnect(async |username, payload| {
println!("Бот {} отключился в состоянии: {:?}", username, payload.state);
Ok(())
});
let mut swarm = Swarm::create().with_join_delay(JoinDelay::fixed(50)).with_handlers(handlers).bind("localhost", 25565);
for i in 0..200 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.launch().await;
swarm.wait_handles().await
}
#[tokio::test]
async fn test_random() {
Swarm::create_random(10)
.with_join_delay(JoinDelay::fixed(50))
.bind("localhost", 25565)
.launch_and_wait()
.await
}
}