battleware_randotron/
lib.rs

1mod state_machine;
2
3use anyhow::Result;
4use battleware_client::Client;
5use battleware_types::{
6    api::{Update, UpdatesFilter, MAX_SUBMISSION_TRANSACTIONS},
7    execution::{Account, Key, Value},
8    Query,
9};
10use commonware_cryptography::{
11    bls12381::primitives::variant::{MinSig, Variant},
12    ed25519::{PrivateKey, PublicKey},
13    PrivateKeyExt, Signer,
14};
15use commonware_macros::select;
16use commonware_runtime::{Clock, Metrics, Spawner};
17use commonware_storage::store::operation::Variable;
18use futures::future::join_all;
19use rand::SeedableRng;
20use rand_chacha::ChaCha20Rng;
21use state_machine::Engine as BotEngine;
22use std::{
23    collections::{hash_map::Entry, HashMap},
24    sync::{
25        atomic::{AtomicUsize, Ordering},
26        Arc,
27    },
28    time::{Duration, Instant},
29};
30use tracing::{error, info, warn};
31
32const CONNECTION_STALE_THRESHOLD: Duration = Duration::from_secs(10);
33const UPLOADS_OUTSTANDING_WAIT_THRESHOLD: Duration = Duration::from_millis(250);
34const STUCK_THRESHOLD: usize = 3;
35const STUCK_EXIT_THRESHOLD: usize = 20;
36const BATCH_FETCH_SIZE: usize = 50;
37const MAX_UPLOADS_OUTSTANDING: usize = 5;
38pub const SEED_LENGTH: usize = 32;
39
40/// Configuration for the randotron engine
41pub struct EngineConfig {
42    pub num_keys: usize,
43    pub network_identity: <MinSig as Variant>::Public,
44    pub seed: [u8; SEED_LENGTH],
45}
46
47/// Configuration for randotron deployment (from config file)
48#[derive(serde::Serialize, serde::Deserialize)]
49pub struct Config {
50    pub num_keys: usize,
51    pub base_url: String,
52    pub network_identity: String,
53    pub log_level: String,
54    pub seed: String,
55    pub worker_threads: usize,
56}
57
58/// Main engine for running the randotron
59pub struct Engine<E: Clock + Spawner + Metrics> {
60    context: E,
61    config: EngineConfig,
62    client: Client,
63    bot: BotEngine,
64    last_account_batch: Option<Instant>,
65}
66
67impl<E: Clock + Spawner + Metrics> Engine<E> {
68    pub async fn new(context: E, config: EngineConfig, client: Client) -> Result<Self> {
69        // Initialize bot engine
70        let bot = BotEngine::new(config.network_identity);
71
72        Ok(Self {
73            context,
74            config,
75            client,
76            bot,
77            last_account_batch: None,
78        })
79    }
80
81    async fn load_account(
82        client: &Client,
83        account: &PublicKey,
84    ) -> Result<Option<(Account, Option<Value>)>> {
85        // Fetch current state
86        let Some(state) = client.query_state(&Key::Account(account.clone())).await? else {
87            error!("Account {} not found", account);
88            return Ok(None);
89        };
90        let Variable::Update(_, Value::Account(state)) = state.operation else {
91            panic!("Expected account update");
92        };
93
94        // If account in battle, fetch battle info
95        let battle = if let Some(battle) = state.battle {
96            // Both of these errors can happen during asynchrony
97            let Some(battle) = client.query_state(&Key::Battle(battle)).await? else {
98                return Err(anyhow::anyhow!("Battle not found"));
99            };
100            let Variable::Update(_, battle) = battle.operation else {
101                return Err(anyhow::anyhow!("Expected battle update"));
102            };
103            Some(battle)
104        } else {
105            None
106        };
107
108        Ok(Some((state, battle)))
109    }
110
111    async fn load_accounts(&mut self, accounts: Vec<PublicKey>) {
112        // Process accounts in batches
113        for chunk in accounts.chunks(BATCH_FETCH_SIZE) {
114            // Load batch concurrently
115            let client = &self.client;
116            let jobs = chunk.iter().map(|public| async move {
117                Self::load_account(client, public)
118                    .await
119                    .unwrap_or_else(|e| {
120                        warn!(?public, ?e, "Failed to load account");
121                        None
122                    })
123            });
124            let results = join_all(jobs).await;
125
126            // Process results and refresh bot state
127            for (public, result) in chunk.iter().cloned().zip(results) {
128                let Some((state, battle)) = result else {
129                    continue;
130                };
131                self.bot.refresh_account(public, state, battle);
132            }
133        }
134    }
135
136    pub async fn run(mut self) {
137        // Create new RNG (must recreate to start with same accounts each loop)
138        let mut rng = ChaCha20Rng::from_seed(self.config.seed);
139
140        // Subscribe to all updates
141        let mut stream = self
142            .client
143            .connect_updates(UpdatesFilter::All)
144            .await
145            .expect("failed to connect to updates");
146
147        // Loop until stream is closed
148        let mut seed_cache = HashMap::new();
149        let uploads_outstanding = Arc::new(AtomicUsize::new(0));
150        loop {
151            // Check if we need to add more accounts
152            let now = Instant::now();
153            if self.bot.accounts() < self.config.num_keys {
154                let should_add = self
155                    .last_account_batch
156                    .is_none_or(|last| now.duration_since(last) >= Duration::from_secs(1));
157                if should_add {
158                    // Generate private key
159                    let private = PrivateKey::from_rng(&mut rng);
160                    let public = private.public_key();
161                    self.bot.add_account(private);
162
163                    // Load accounts
164                    self.load_accounts(vec![public]).await;
165                    info!(
166                        elapsed = ?now.elapsed(),
167                        total = self.bot.accounts(),
168                        "Initialized account",
169                    );
170
171                    // Update last batch time
172                    self.last_account_batch = Some(Instant::now());
173                }
174            }
175
176            // Process update
177            let (is_seed, requested, mut txs) = select! {
178                update = stream.next() => {
179                    let update = update.expect("stream closed").expect("failed to handle update");
180                    let is_seed = matches!(update, Update::Seed(_));
181                    let (requested, txs) = self.bot.apply(update);
182                    (is_seed, requested, txs)
183                },
184                _ = self.context.sleep(CONNECTION_STALE_THRESHOLD) => {
185                    warn!("Connection stale");
186                    return;
187                }
188            };
189
190            // Collect necessary seeds
191            if is_seed && requested.is_empty() && !seed_cache.is_empty() {
192                // No missing seeds necessary, ok to clear
193                warn!(size = seed_cache.len(), "Cleared missing seeds");
194                seed_cache.clear();
195            }
196            for index in requested {
197                // Fetch missing seed
198                if let Entry::Vacant(entry) = seed_cache.entry(index) {
199                    let Ok(Some(seed)) = self.client.query_seed(Query::Index(index)).await else {
200                        warn!("Failed to request seed: {}", index);
201                        continue;
202                    };
203                    entry.insert(seed);
204                }
205
206                // Apply seed (don't worry about missing seeds, we can get them next time if still needed)
207                let (_, new_txs) = self.bot.apply_seed(seed_cache[&index].clone());
208                txs.extend(new_txs);
209            }
210
211            // Wait for uploads to be under max
212            loop {
213                let outstanding = uploads_outstanding.load(Ordering::Relaxed);
214                if outstanding < MAX_UPLOADS_OUTSTANDING {
215                    break;
216                }
217                warn!(outstanding, "Waiting for uploads to be under max");
218                self.context.sleep(UPLOADS_OUTSTANDING_WAIT_THRESHOLD).await;
219            }
220
221            // Submit transactions (fire-and-forget)
222            while !txs.is_empty() {
223                uploads_outstanding.fetch_add(1, Ordering::Relaxed);
224                let chunk: Vec<_> = txs
225                    .drain(..MAX_SUBMISSION_TRANSACTIONS.min(txs.len()))
226                    .collect();
227                self.context.with_label("submit").spawn({
228                    let client = self.client.clone();
229                    let uploads_outstanding = uploads_outstanding.clone();
230                    move |_| async move {
231                        if let Err(e) = client.submit_transactions(chunk).await {
232                            warn!("Failed to submit transaction: {}", e);
233                        }
234                        uploads_outstanding.fetch_sub(1, Ordering::Relaxed);
235                    }
236                });
237            }
238
239            // Check for stuck accounts
240            //
241            // This can occur when we reload an account and assign it to the "battle" state even though
242            // the battle has just ended (can occur because of asynchrony)
243            let stuck = self.bot.stuck(STUCK_THRESHOLD);
244            assert!(
245                stuck.len() <= STUCK_EXIT_THRESHOLD,
246                "Exceeded stuck threshold"
247            );
248            if !stuck.is_empty() {
249                warn!(?stuck, "Refreshing stuck accounts");
250                self.load_accounts(stuck).await;
251            }
252
253            // Get stats
254            let (uninitialized, generating, lobby, battle) = self.bot.stats();
255            info!(uninitialized, generating, lobby, battle, "Stats");
256        }
257    }
258}