1mod state_machine;
2
3use anyhow::Result;
4use battleware_client::Client;
5use battleware_types::{
6 api::{Update, UpdatesFilter, MAX_SUBMISSION_TRANSACTIONS},
7 execution::{Account, Key, Value},
8 Query,
9};
10use commonware_cryptography::{
11 bls12381::primitives::variant::{MinSig, Variant},
12 ed25519::{PrivateKey, PublicKey},
13 PrivateKeyExt, Signer,
14};
15use commonware_macros::select;
16use commonware_runtime::{Clock, Metrics, Spawner};
17use commonware_storage::store::operation::Variable;
18use futures::future::join_all;
19use rand::SeedableRng;
20use rand_chacha::ChaCha20Rng;
21use state_machine::Engine as BotEngine;
22use std::{
23 collections::{hash_map::Entry, HashMap},
24 sync::{
25 atomic::{AtomicUsize, Ordering},
26 Arc,
27 },
28 time::{Duration, Instant},
29};
30use tracing::{error, info, warn};
31
32const CONNECTION_STALE_THRESHOLD: Duration = Duration::from_secs(10);
33const UPLOADS_OUTSTANDING_WAIT_THRESHOLD: Duration = Duration::from_millis(250);
34const STUCK_THRESHOLD: usize = 3;
35const STUCK_EXIT_THRESHOLD: usize = 20;
36const BATCH_FETCH_SIZE: usize = 50;
37const MAX_UPLOADS_OUTSTANDING: usize = 5;
38pub const SEED_LENGTH: usize = 32;
39
40pub struct EngineConfig {
42 pub num_keys: usize,
43 pub network_identity: <MinSig as Variant>::Public,
44 pub seed: [u8; SEED_LENGTH],
45}
46
47#[derive(serde::Serialize, serde::Deserialize)]
49pub struct Config {
50 pub num_keys: usize,
51 pub base_url: String,
52 pub network_identity: String,
53 pub log_level: String,
54 pub seed: String,
55 pub worker_threads: usize,
56}
57
58pub struct Engine<E: Clock + Spawner + Metrics> {
60 context: E,
61 config: EngineConfig,
62 client: Client,
63 bot: BotEngine,
64 last_account_batch: Option<Instant>,
65}
66
67impl<E: Clock + Spawner + Metrics> Engine<E> {
68 pub async fn new(context: E, config: EngineConfig, client: Client) -> Result<Self> {
69 let bot = BotEngine::new(config.network_identity);
71
72 Ok(Self {
73 context,
74 config,
75 client,
76 bot,
77 last_account_batch: None,
78 })
79 }
80
81 async fn load_account(
82 client: &Client,
83 account: &PublicKey,
84 ) -> Result<Option<(Account, Option<Value>)>> {
85 let Some(state) = client.query_state(&Key::Account(account.clone())).await? else {
87 error!("Account {} not found", account);
88 return Ok(None);
89 };
90 let Variable::Update(_, Value::Account(state)) = state.operation else {
91 panic!("Expected account update");
92 };
93
94 let battle = if let Some(battle) = state.battle {
96 let Some(battle) = client.query_state(&Key::Battle(battle)).await? else {
98 return Err(anyhow::anyhow!("Battle not found"));
99 };
100 let Variable::Update(_, battle) = battle.operation else {
101 return Err(anyhow::anyhow!("Expected battle update"));
102 };
103 Some(battle)
104 } else {
105 None
106 };
107
108 Ok(Some((state, battle)))
109 }
110
111 async fn load_accounts(&mut self, accounts: Vec<PublicKey>) {
112 for chunk in accounts.chunks(BATCH_FETCH_SIZE) {
114 let client = &self.client;
116 let jobs = chunk.iter().map(|public| async move {
117 Self::load_account(client, public)
118 .await
119 .unwrap_or_else(|e| {
120 warn!(?public, ?e, "Failed to load account");
121 None
122 })
123 });
124 let results = join_all(jobs).await;
125
126 for (public, result) in chunk.iter().cloned().zip(results) {
128 let Some((state, battle)) = result else {
129 continue;
130 };
131 self.bot.refresh_account(public, state, battle);
132 }
133 }
134 }
135
136 pub async fn run(mut self) {
137 let mut rng = ChaCha20Rng::from_seed(self.config.seed);
139
140 let mut stream = self
142 .client
143 .connect_updates(UpdatesFilter::All)
144 .await
145 .expect("failed to connect to updates");
146
147 let mut seed_cache = HashMap::new();
149 let uploads_outstanding = Arc::new(AtomicUsize::new(0));
150 loop {
151 let now = Instant::now();
153 if self.bot.accounts() < self.config.num_keys {
154 let should_add = self
155 .last_account_batch
156 .is_none_or(|last| now.duration_since(last) >= Duration::from_secs(1));
157 if should_add {
158 let private = PrivateKey::from_rng(&mut rng);
160 let public = private.public_key();
161 self.bot.add_account(private);
162
163 self.load_accounts(vec![public]).await;
165 info!(
166 elapsed = ?now.elapsed(),
167 total = self.bot.accounts(),
168 "Initialized account",
169 );
170
171 self.last_account_batch = Some(Instant::now());
173 }
174 }
175
176 let (is_seed, requested, mut txs) = select! {
178 update = stream.next() => {
179 let update = update.expect("stream closed").expect("failed to handle update");
180 let is_seed = matches!(update, Update::Seed(_));
181 let (requested, txs) = self.bot.apply(update);
182 (is_seed, requested, txs)
183 },
184 _ = self.context.sleep(CONNECTION_STALE_THRESHOLD) => {
185 warn!("Connection stale");
186 return;
187 }
188 };
189
190 if is_seed && requested.is_empty() && !seed_cache.is_empty() {
192 warn!(size = seed_cache.len(), "Cleared missing seeds");
194 seed_cache.clear();
195 }
196 for index in requested {
197 if let Entry::Vacant(entry) = seed_cache.entry(index) {
199 let Ok(Some(seed)) = self.client.query_seed(Query::Index(index)).await else {
200 warn!("Failed to request seed: {}", index);
201 continue;
202 };
203 entry.insert(seed);
204 }
205
206 let (_, new_txs) = self.bot.apply_seed(seed_cache[&index].clone());
208 txs.extend(new_txs);
209 }
210
211 loop {
213 let outstanding = uploads_outstanding.load(Ordering::Relaxed);
214 if outstanding < MAX_UPLOADS_OUTSTANDING {
215 break;
216 }
217 warn!(outstanding, "Waiting for uploads to be under max");
218 self.context.sleep(UPLOADS_OUTSTANDING_WAIT_THRESHOLD).await;
219 }
220
221 while !txs.is_empty() {
223 uploads_outstanding.fetch_add(1, Ordering::Relaxed);
224 let chunk: Vec<_> = txs
225 .drain(..MAX_SUBMISSION_TRANSACTIONS.min(txs.len()))
226 .collect();
227 self.context.with_label("submit").spawn({
228 let client = self.client.clone();
229 let uploads_outstanding = uploads_outstanding.clone();
230 move |_| async move {
231 if let Err(e) = client.submit_transactions(chunk).await {
232 warn!("Failed to submit transaction: {}", e);
233 }
234 uploads_outstanding.fetch_sub(1, Ordering::Relaxed);
235 }
236 });
237 }
238
239 let stuck = self.bot.stuck(STUCK_THRESHOLD);
244 assert!(
245 stuck.len() <= STUCK_EXIT_THRESHOLD,
246 "Exceeded stuck threshold"
247 );
248 if !stuck.is_empty() {
249 warn!(?stuck, "Refreshing stuck accounts");
250 self.load_accounts(stuck).await;
251 }
252
253 let (uninitialized, generating, lobby, battle) = self.bot.stats();
255 info!(uninitialized, generating, lobby, battle, "Stats");
256 }
257 }
258}