use std::io::{self};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::bot::Bot;
use crate::storage::Storage;
use crate::swarm::{JoinDelay, TargetServer};
pub struct Swarm {
pub bots: Vec<Arc<Bot>>,
target_server: Arc<RwLock<TargetServer>>,
join_delay: Arc<JoinDelay>,
handles: Vec<JoinHandle<core::result::Result<(), std::io::Error>>>,
shared_storage: Arc<RwLock<Storage>>,
}
impl Swarm {
pub fn create() -> Self {
Self {
bots: Vec::new(),
target_server: Arc::new(RwLock::new(TargetServer::default())),
join_delay: Arc::new(JoinDelay::fixed(1000)),
handles: Vec::new(),
shared_storage: Arc::new(RwLock::new(Storage::null())),
}
}
pub fn create_with_capacity(capacity: usize) -> Self {
Self {
bots: Vec::with_capacity(capacity),
target_server: Arc::new(RwLock::new(TargetServer::default())),
join_delay: Arc::new(JoinDelay::fixed(1000)),
handles: Vec::with_capacity(capacity),
shared_storage: Arc::new(RwLock::new(Storage::null())),
}
}
pub fn set_shared_storage(mut self, storage: Arc<RwLock<Storage>>) -> Self {
self.shared_storage = storage;
self
}
pub fn set_join_delay(mut self, join_delay: JoinDelay) -> Self {
self.join_delay = Arc::new(join_delay);
self
}
pub fn bind(self, server_host: impl Into<String>, server_port: u16) -> Self {
match self.target_server.try_write() {
Ok(mut guard) => {
guard.host = server_host.into();
guard.port = server_port;
}
Err(_) => {}
};
self
}
pub async fn rebind(&self, server_host: impl Into<String>, server_port: u16) {
let mut guard = self.target_server.write().await;
guard.host = server_host.into();
guard.port = server_port;
}
pub fn get_shared_storage(&self) -> Arc<RwLock<Storage>> {
Arc::clone(&self.shared_storage)
}
pub async fn for_each_consistent<F, Fut>(&self, f: F)
where
F: Fn(Arc<Bot>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
for i in &self.bots {
let bot = Arc::clone(i);
f(bot).await;
}
}
pub fn for_each_parallel<F, Fut>(&self, f: F)
where
F: Fn(Arc<Bot>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
let f = Arc::new(f);
for i in &self.bots {
let f_clone = Arc::clone(&f);
let bot = Arc::clone(i);
tokio::spawn(f_clone(bot));
}
}
pub fn add_bot(&mut self, bot: Bot) {
self.bots.push(Arc::new(bot.set_storage(Arc::clone(&self.shared_storage))));
}
pub fn add_bots(&mut self, bots: Vec<Bot>) {
for bot in bots {
self.bots.push(Arc::new(bot.set_storage(Arc::clone(&self.shared_storage))));
}
}
pub fn with_bot(mut self, bot: Bot) -> Self {
self.bots.push(Arc::new(bot.set_storage(Arc::clone(&self.shared_storage))));
self
}
pub fn with_bots(mut self, bots: Vec<Bot>) -> Self {
for bot in bots {
self.bots.push(Arc::new(bot.set_storage(Arc::clone(&self.shared_storage))));
}
self
}
pub fn username_is_unique(&self, username: &str) -> bool {
for bot in &self.bots {
if username == bot.username() {
return false;
}
}
true
}
pub async fn launch(&mut self) {
let total_bots = self.bots.len();
let (host, port) = {
let guard = self.target_server.read().await;
(guard.host.clone(), guard.port)
};
for (index, bot) in self.bots.iter().enumerate() {
let handle = bot.connect_with_handle(&host, port);
self.handles.push(handle);
let delay = self.join_delay.activate(index, total_bots);
if index < total_bots - 1 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
}
pub async fn launch_and_wait(&mut self) {
self.launch().await;
self.wait_handles().await;
}
pub fn instant_launch(&mut self) {
let (host, port) = match self.target_server.try_read() {
Ok(g) => (g.host.clone(), g.port),
Err(_) => ("localhost".to_string(), 25565),
};
for bot in &self.bots {
let handle = bot.connect_with_handle(&host, port);
self.handles.push(handle);
}
}
pub fn quiet_launch(&self) -> JoinHandle<()> {
let total_bots = self.bots.len();
let join_delay = Arc::clone(&self.join_delay);
let bots = {
let mut vec = Vec::new();
for bot in &self.bots {
vec.push(Arc::clone(bot));
}
vec
};
let (host, port) = match self.target_server.try_read() {
Ok(g) => (g.host.clone(), g.port),
Err(_) => ("localhost".to_string(), 25565),
};
tokio::spawn(async move {
let mut handles = Vec::with_capacity(total_bots);
for (index, bot) in bots.iter().enumerate() {
let handle = bot.connect_with_handle(&host, port);
handles.push(handle);
let delay = join_delay.activate(index, total_bots);
if index < total_bots - 1 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
}
for handle in handles {
let _ = handle.await;
}
})
}
pub fn bots_count(&self) -> usize {
self.bots.len()
}
pub fn handles_count(&self) -> usize {
self.handles.len()
}
pub fn is_null(&self) -> bool {
self.bots.is_empty()
}
pub fn get_bot_usernames(&self) -> Vec<String> {
self.bots.iter().map(|bot| bot.username().to_string()).collect()
}
pub async fn shutdown(&mut self) -> io::Result<()> {
self.abort_handles();
tokio::time::sleep(Duration::from_millis(100)).await;
for bot in &self.bots {
bot.shutdown().await?;
}
self.handles.clear();
self.bots.clear();
self.shared_storage.write().await.clear();
Ok(())
}
pub fn abort_handles(&self) {
for handle in &self.handles {
handle.abort();
}
}
pub async fn wait_handles(&mut self) {
for handle in &mut self.handles {
if !handle.is_finished() {
let _ = handle.await;
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::bot::Bot;
use crate::swarm::{JoinDelay, Swarm};
#[tokio::test]
async fn test_instant() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(10);
for i in 0..10 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.instant_launch();
tokio::time::sleep(Duration::from_secs(3)).await;
swarm.for_each_parallel(async |bot| {
let position = bot.get_position().await;
let rotation = bot.get_rotation().await;
println!("[{}] Позиция: {:?}, Ротация: {:?}", bot.username(), position, rotation);
});
tokio::time::sleep(Duration::from_secs(8)).await;
swarm.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_quiet() -> std::io::Result<()> {
let mut bots = Vec::new();
for i in 0..10 {
bots.push(Bot::create(format!("nurtex_{}", i)));
}
let mut swarm = Swarm::create_with_capacity(10)
.with_bots(bots)
.set_join_delay(JoinDelay::fixed(200))
.bind("localhost", 25565);
let handle = swarm.quiet_launch();
tokio::time::sleep(Duration::from_secs(1)).await;
handle.abort();
swarm.shutdown().await?;
tokio::time::sleep(Duration::from_secs(5)).await;
Ok(())
}
#[tokio::test]
async fn test_wait_handles() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(6).set_join_delay(JoinDelay::fixed(200)).bind("localhost", 25565);
for i in 0..6 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.launch().await;
swarm.wait_handles().await;
Ok(())
}
#[tokio::test]
async fn test_shared_storage() -> std::io::Result<()> {
let mut swarm = Swarm::create_with_capacity(6).set_join_delay(JoinDelay::fixed(200)).bind("localhost", 25565);
for i in 0..6 {
swarm.add_bot(Bot::create(format!("nurtex_{}", i)));
}
swarm.launch().await;
for _ in 0..5 {
let storage = swarm.get_shared_storage();
let entities = {
let guard = storage.read().await;
guard.entities.clone()
};
println!("Сущности: {:?}", entities);
tokio::time::sleep(Duration::from_secs(3)).await;
}
Ok(())
}
}