1use axum::{
2 body::Bytes,
3 extract::{ws::WebSocketUpgrade, State as AxumState},
4 http::{header, Method, StatusCode},
5 response::IntoResponse,
6 routing::{get, post},
7 Router,
8};
9use battleware_types::{
10 api::{Events, FilteredEvents, Lookup, Pending, Submission, Summary, Update, UpdatesFilter},
11 execution::{Event, Output, Progress, Seed, Transaction, Value},
12 Identity, Query, NAMESPACE,
13};
14use commonware_codec::{DecodeExt, Encode};
15use commonware_consensus::{aggregation::types::Certificate, Viewable};
16use commonware_cryptography::{
17 bls12381::primitives::variant::MinSig, ed25519::PublicKey, sha256::Digest,
18};
19use commonware_storage::{
20 adb::{
21 create_multi_proof, create_proof, create_proof_store_from_digests,
22 digests_required_for_proof,
23 },
24 store::operation::{Keyless, Variable},
25};
26use commonware_utils::from_hex;
27use futures::{SinkExt, StreamExt};
28use std::{
29 collections::{BTreeMap, HashMap, HashSet},
30 sync::{Arc, RwLock},
31};
32use tokio::sync::broadcast;
33use tower_http::cors::{Any, CorsLayer};
34
35#[derive(Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum InternalUpdate {
38 Seed(Seed),
39 Events(Events, Vec<(u64, Digest)>),
40}
41
42#[derive(Default)]
43pub struct State {
44 seeds: BTreeMap<u64, Seed>,
45
46 nodes: BTreeMap<u64, Digest>,
47 leaves: BTreeMap<u64, Variable<Digest, Value>>,
48 #[allow(clippy::type_complexity)]
49 keys: HashMap<Digest, BTreeMap<u64, (u64, Variable<Digest, Value>)>>,
50 progress: BTreeMap<u64, (Progress, Certificate<MinSig, Digest>)>,
51
52 submitted_events: HashSet<u64>,
53 submitted_state: HashSet<u64>,
54}
55
56#[derive(Clone)]
57pub struct Simulator {
58 identity: Identity,
59 state: Arc<RwLock<State>>,
60 update_tx: broadcast::Sender<InternalUpdate>,
61 mempool_tx: broadcast::Sender<Pending>,
62}
63
64impl Simulator {
65 pub fn new(identity: Identity) -> Self {
66 let (update_tx, _) = broadcast::channel(1024);
67 let (mempool_tx, _) = broadcast::channel(1024);
68 let state = Arc::new(RwLock::new(State::default()));
69
70 Self {
71 identity,
72 state,
73 update_tx,
74 mempool_tx,
75 }
76 }
77}
78
79impl Simulator {
80 pub fn submit_seed(&self, seed: Seed) {
81 let mut state = self.state.write().unwrap();
82 if state.seeds.insert(seed.view(), seed.clone()).is_some() {
83 return;
84 }
85 let _ = self.update_tx.send(InternalUpdate::Seed(seed));
86 }
87
88 pub fn submit_transactions(&self, transactions: Vec<Transaction>) {
89 let _ = self.mempool_tx.send(Pending { transactions });
90 }
91
92 pub fn submit_state(&self, summary: Summary, inner: Vec<(u64, Digest)>) {
93 let mut state = self.state.write().unwrap();
94 if !state.submitted_state.insert(summary.progress.height) {
95 return;
96 }
97
98 for (pos, digest) in inner {
100 state.nodes.insert(pos, digest);
101 }
102
103 let start_loc = summary.progress.state_start_op;
105 for (i, value) in summary.state_proof_ops.into_iter().enumerate() {
106 let loc = start_loc + i as u64;
108 state.leaves.insert(loc, value.clone());
109
110 match value {
112 Variable::Update(key, value) => {
113 state
114 .keys
115 .entry(key)
116 .or_default()
117 .insert(summary.progress.height, (loc, Variable::Update(key, value)));
118 }
119 Variable::Delete(key) => {
120 state
121 .keys
122 .entry(key)
123 .or_default()
124 .insert(summary.progress.height, (loc, Variable::Delete(key)));
125 }
126 _ => {}
127 }
128 }
129
130 state.progress.insert(
132 summary.progress.height,
133 (summary.progress, summary.certificate),
134 );
135 }
136
137 pub fn submit_events(&self, summary: Summary, events_digests: Vec<(u64, Digest)>) {
138 let mut state = self.state.write().unwrap();
139 let height = summary.progress.height;
140 if !state.submitted_events.insert(height) {
141 return;
142 }
143
144 let _ = self.update_tx.send(InternalUpdate::Events(
146 Events {
147 progress: summary.progress,
148 certificate: summary.certificate,
149 events_proof: summary.events_proof,
150 events_proof_ops: summary.events_proof_ops,
151 },
152 events_digests,
153 ));
154 }
155
156 pub fn query_state(&self, key: &Digest) -> Option<Lookup> {
157 let state = self.state.read().unwrap();
158 let (height, operation) = state.keys.get(key)?.last_key_value()?;
159 let (loc, Variable::Update(_, value)) = operation else {
160 return None;
161 };
162
163 let (progress, certificate) = state.progress.get(height)?;
165
166 let required_digest_positions =
168 digests_required_for_proof::<Digest>(progress.state_end_op, *loc, *loc);
169 let required_digests = required_digest_positions
170 .iter()
171 .map(|pos| state.nodes.get(pos).cloned().unwrap())
172 .collect::<Vec<_>>();
173
174 let proof = create_proof(progress.state_end_op, required_digests);
176
177 Some(Lookup {
178 progress: *progress,
179 certificate: certificate.clone(),
180 proof,
181 location: *loc,
182 operation: Variable::Update(*key, value.clone()),
183 })
184 }
185
186 pub fn query_seed(&self, query: &Query) -> Option<Seed> {
187 let state = self.state.read().unwrap();
188 match query {
189 Query::Latest => state.seeds.last_key_value().map(|(_, seed)| seed.clone()),
190 Query::Index(index) => state.seeds.get(index).cloned(),
191 }
192 }
193
194 pub fn update_subscriber(&self) -> broadcast::Receiver<InternalUpdate> {
195 self.update_tx.subscribe()
196 }
197
198 pub fn mempool_subscriber(&self) -> broadcast::Receiver<Pending> {
199 self.mempool_tx.subscribe()
200 }
201}
202
203pub struct Api {
204 simulator: Arc<Simulator>,
205}
206
207impl Api {
208 pub fn new(simulator: Arc<Simulator>) -> Self {
209 Self { simulator }
210 }
211
212 pub fn router(&self) -> Router {
213 let cors = CorsLayer::new()
215 .allow_origin(Any)
216 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
217 .allow_headers([header::CONTENT_TYPE]);
218
219 Router::new()
220 .route("/submit", post(submit))
221 .route("/seed/:query", get(query_seed))
222 .route("/state/:query", get(query_state))
223 .route("/updates/:filter", get(updates_ws))
224 .route("/mempool", get(mempool_ws))
225 .layer(cors)
226 .with_state(self.simulator.clone())
227 }
228}
229
230async fn submit(AxumState(simulator): AxumState<Arc<Simulator>>, body: Bytes) -> impl IntoResponse {
231 let submission = match Submission::decode(&mut body.as_ref()) {
232 Ok(submission) => submission,
233 Err(_) => return StatusCode::BAD_REQUEST,
234 };
235
236 match submission {
237 Submission::Seed(seed) => {
238 if !seed.verify(NAMESPACE, &simulator.identity) {
239 return StatusCode::BAD_REQUEST;
240 }
241 simulator.submit_seed(seed);
242 StatusCode::OK
243 }
244 Submission::Transactions(txs) => {
245 simulator.submit_transactions(txs);
246 StatusCode::OK
247 }
248 Submission::Summary(summary) => {
249 let Some((state_digests, events_digests)) = summary.verify(&simulator.identity) else {
250 return StatusCode::BAD_REQUEST;
251 };
252 simulator.submit_events(summary.clone(), events_digests);
253 simulator.submit_state(summary, state_digests);
254 StatusCode::OK
255 }
256 }
257}
258
259async fn query_state(
260 AxumState(simulator): AxumState<Arc<Simulator>>,
261 axum::extract::Path(query): axum::extract::Path<String>,
262) -> impl IntoResponse {
263 let raw = match from_hex(&query) {
264 Some(raw) => raw,
265 None => return StatusCode::BAD_REQUEST.into_response(),
266 };
267 let key = match Digest::decode(&mut raw.as_slice()) {
268 Ok(key) => key,
269 Err(_) => return StatusCode::BAD_REQUEST.into_response(),
270 };
271 match simulator.query_state(&key) {
272 Some(value) => (StatusCode::OK, value.encode().to_vec()).into_response(),
273 None => (StatusCode::NOT_FOUND, vec![]).into_response(),
274 }
275}
276
277async fn query_seed(
278 AxumState(simulator): AxumState<Arc<Simulator>>,
279 axum::extract::Path(query): axum::extract::Path<String>,
280) -> impl IntoResponse {
281 let raw = match from_hex(&query) {
282 Some(raw) => raw,
283 None => return StatusCode::BAD_REQUEST.into_response(),
284 };
285 let query = match Query::decode(&mut raw.as_slice()) {
286 Ok(query) => query,
287 Err(_) => return StatusCode::BAD_REQUEST.into_response(),
288 };
289 match simulator.query_seed(&query) {
290 Some(seed) => (StatusCode::OK, seed.encode().to_vec()).into_response(),
291 None => (StatusCode::NOT_FOUND, vec![]).into_response(),
292 }
293}
294
295async fn updates_ws(
296 AxumState(simulator): AxumState<Arc<Simulator>>,
297 axum::extract::Path(filter): axum::extract::Path<String>,
298 ws: WebSocketUpgrade,
299) -> impl IntoResponse {
300 ws.on_upgrade(move |socket| handle_updates_ws(socket, simulator, filter))
301}
302
303async fn mempool_ws(
304 AxumState(simulator): AxumState<Arc<Simulator>>,
305 ws: WebSocketUpgrade,
306) -> impl IntoResponse {
307 ws.on_upgrade(move |socket| handle_mempool_ws(socket, simulator))
308}
309
310async fn handle_updates_ws(
311 socket: axum::extract::ws::WebSocket,
312 simulator: Arc<Simulator>,
313 filter: String,
314) {
315 let (mut sender, _receiver) = socket.split();
316 let mut updates = simulator.update_subscriber();
317
318 let filter = match from_hex(&filter) {
320 Some(filter) => filter,
321 None => return,
322 };
323 let subscription = match UpdatesFilter::decode(&mut filter.as_slice()) {
324 Ok(subscription) => subscription,
325 Err(_) => return,
326 };
327
328 while let Ok(internal_update) = updates.recv().await {
330 let update = match internal_update {
332 InternalUpdate::Seed(seed) => Some(Update::Seed(seed)),
333 InternalUpdate::Events(events, digests) => match &subscription {
334 UpdatesFilter::All => Some(Update::Events(events)),
335 UpdatesFilter::Account(account) => {
336 filter_updates_for_account(events, digests, account).await
337 }
338 },
339 };
340 let Some(update) = update else {
341 continue;
342 };
343
344 if sender
346 .send(axum::extract::ws::Message::Binary(update.encode().to_vec()))
347 .await
348 .is_err()
349 {
350 break;
351 }
352 }
353}
354
355async fn handle_mempool_ws(socket: axum::extract::ws::WebSocket, simulator: Arc<Simulator>) {
356 let (mut sender, _receiver) = socket.split();
357 let mut txs = simulator.mempool_subscriber();
358
359 while let Ok(tx) = txs.recv().await {
360 if sender
361 .send(axum::extract::ws::Message::Binary(tx.encode().to_vec()))
362 .await
363 .is_err()
364 {
365 break;
366 }
367 }
368}
369
370async fn filter_updates_for_account(
371 events: Events,
372 digests: Vec<(u64, Digest)>,
373 account: &PublicKey,
374) -> Option<Update> {
375 let mut filtered_ops = Vec::new();
377 for (i, op) in events.events_proof_ops.into_iter().enumerate() {
378 let should_include = match &op {
379 Keyless::Append(output) => match output {
380 Output::Event(event) => is_event_relevant_to_account(event, account),
381 Output::Transaction(tx) => tx.public == *account,
382 _ => false,
383 },
384 Keyless::Commit(_) => false,
385 };
386 if should_include {
387 filtered_ops.push((events.progress.events_start_op + i as u64, op));
389 }
390 }
391
392 if filtered_ops.is_empty() {
394 return None;
395 }
396
397 let proof_store = create_proof_store_from_digests(&events.events_proof, digests);
400
401 let locations_to_include = filtered_ops.iter().map(|(loc, _)| *loc).collect::<Vec<_>>();
403 let filtered_proof = create_multi_proof(&proof_store, &locations_to_include)
404 .await
405 .expect("failed to generate filtered proof");
406 Some(Update::FilteredEvents(FilteredEvents {
407 progress: events.progress,
408 certificate: events.certificate,
409 events_proof: filtered_proof,
410 events_proof_ops: filtered_ops,
411 }))
412}
413
414fn is_event_relevant_to_account(event: &Event, account: &PublicKey) -> bool {
415 match event {
416 Event::Generated {
417 account: player, ..
418 } => account == player,
419 Event::Matched {
420 player_a, player_b, ..
421 } => player_a == account || player_b == account,
422 Event::Locked {
423 locker, observer, ..
424 } => locker == account || observer == account,
425 Event::Moved {
426 player_a, player_b, ..
427 } => player_a == account || player_b == account,
428 Event::Settled {
429 player_a, player_b, ..
430 } => player_a == account || player_b == account,
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use battleware_execution::mocks::{
438 create_account_keypair, create_adbs, create_network_keypair, create_seed, execute_block,
439 };
440 use battleware_types::execution::{Instruction, Key, Stats, Transaction, Value};
441 use commonware_cryptography::{Hasher, Sha256};
442 use commonware_runtime::{deterministic::Runner, Runner as _};
443 use commonware_storage::store::operation::Variable;
444 use futures::executor::block_on;
445
446 #[test]
447 fn test_submit_seed() {
448 let (network_secret, network_identity) = create_network_keypair();
449 let simulator = Simulator::new(network_identity);
450 let mut update_stream = simulator.update_subscriber();
451
452 let seed = create_seed(&network_secret, 1);
454 simulator.submit_seed(seed.clone());
455 let received_update = block_on(async { update_stream.recv().await.unwrap() });
456 match received_update {
457 InternalUpdate::Seed(received_seed) => assert_eq!(received_seed, seed),
458 _ => panic!("Expected seed update"),
459 }
460 assert_eq!(simulator.query_seed(&Query::Latest), Some(seed.clone()));
461 assert_eq!(simulator.query_seed(&Query::Index(1)), Some(seed));
462
463 let seed = create_seed(&network_secret, 3);
465 simulator.submit_seed(seed.clone());
466 let received_update = block_on(async { update_stream.recv().await.unwrap() });
467 match received_update {
468 InternalUpdate::Seed(received_seed) => assert_eq!(received_seed, seed),
469 _ => panic!("Expected seed update"),
470 }
471 assert_eq!(simulator.query_seed(&Query::Latest), Some(seed.clone()));
472 assert_eq!(simulator.query_seed(&Query::Index(2)), None);
473 assert_eq!(simulator.query_seed(&Query::Index(3)), Some(seed.clone()));
474 }
475
476 #[test]
477 fn test_submit_transaction() {
478 let (_, network_identity) = create_network_keypair();
479 let simulator = Simulator::new(network_identity);
480 let mut mempool_rx = simulator.mempool_subscriber();
481
482 let (private, _) = create_account_keypair(1);
483 let tx = Transaction::sign(&private, 1, Instruction::Generate);
484
485 simulator.submit_transactions(vec![tx.clone()]);
486
487 let received_txs = block_on(async { mempool_rx.recv().await.unwrap() });
488 assert_eq!(received_txs.transactions.len(), 1);
489 let received_tx = &received_txs.transactions[0];
490 assert_eq!(received_tx.public, tx.public);
491 assert_eq!(received_tx.nonce, tx.nonce);
492 }
493
494 #[test]
495 fn test_submit_summary() {
496 let executor = Runner::default();
497 executor.start(|context| async move {
498 let (network_secret, network_identity) = create_network_keypair();
500 let simulator = Simulator::new(network_identity);
501 let (mut state, mut events) = create_adbs(&context).await;
502
503 let (private, public) = create_account_keypair(1);
505 let tx = Transaction::sign(&private, 0, Instruction::Generate);
506
507 let (_, summary) = execute_block(
509 &network_secret,
510 network_identity,
511 &mut state,
512 &mut events,
513 1, vec![tx],
515 )
516 .await;
517
518 let (state_digests, events_digests) = summary
520 .verify(&network_identity)
521 .expect("Summary verification failed");
522
523 let mut update_stream = simulator.update_subscriber();
525 simulator.submit_events(summary.clone(), events_digests);
526
527 let update_recv = update_stream.recv().await.unwrap();
529 match update_recv {
530 InternalUpdate::Events(events_recv, _) => {
531 assert!(events_recv.verify(&network_identity));
532 assert_eq!(events_recv.events_proof, summary.events_proof);
533 assert_eq!(events_recv.events_proof_ops, summary.events_proof_ops);
534 }
535 _ => panic!("Expected events update"),
536 }
537
538 simulator.submit_state(summary.clone(), state_digests);
540
541 let account_key = Sha256::hash(&Key::Account(public.clone()).encode());
543 let lookup = simulator.query_state(&account_key).unwrap();
544 assert!(lookup.verify(&network_identity));
545 let Variable::Update(_, Value::Account(account)) = lookup.operation else {
546 panic!("account not found");
547 };
548 assert_eq!(account.nonce, 1);
549 assert_eq!(account.battle, None);
550 assert_eq!(account.stats, Stats::default());
551
552 let (_, other_public) = create_account_keypair(2);
554 let other_key = Sha256::hash(&Key::Account(other_public).encode());
555 assert!(simulator.query_state(&other_key).is_none());
556 });
557 }
558
559 #[test]
560 fn test_filtered_events() {
561 let executor = Runner::default();
562 executor.start(|context| async move {
563 let (network_secret, network_identity) = create_network_keypair();
565 let simulator = Simulator::new(network_identity);
566 let (mut state, mut events) = create_adbs(&context).await;
567
568 let (private1, public1) = create_account_keypair(1);
570 let (private2, _public2) = create_account_keypair(2);
571 let (private3, _public3) = create_account_keypair(3);
572
573 let txs = vec![
575 Transaction::sign(&private1, 0, Instruction::Generate),
576 Transaction::sign(&private2, 0, Instruction::Generate),
577 Transaction::sign(&private3, 0, Instruction::Generate),
578 ];
579
580 let (_, summary) = execute_block(
582 &network_secret,
583 network_identity,
584 &mut state,
585 &mut events,
586 1, txs,
588 )
589 .await;
590
591 let (state_digests, events_digests) = summary.verify(&network_identity).unwrap();
593 simulator.submit_events(summary.clone(), events_digests.clone());
594 simulator.submit_state(summary.clone(), state_digests);
595
596 let original_ops_count = summary.events_proof_ops.len();
598
599 let events = Events {
600 progress: summary.progress,
601 certificate: summary.certificate,
602 events_proof: summary.events_proof,
603 events_proof_ops: summary.events_proof_ops,
604 };
605
606 let filtered = filter_updates_for_account(events, events_digests, &public1)
608 .await
609 .unwrap();
610
611 match filtered {
613 Update::FilteredEvents(filtered_events) => {
614 let included_count = filtered_events.events_proof_ops.len();
616
617 for (_loc, op) in &filtered_events.events_proof_ops {
619 if let Keyless::Append(Output::Event(Event::Generated {
620 account, ..
621 })) = op
622 {
623 assert_eq!(
624 account, &public1,
625 "Filtered events should only contain account1"
626 );
627 }
628 }
629
630 assert!(
632 included_count > 0,
633 "Should have at least one included event"
634 );
635 assert!(
636 included_count < original_ops_count,
637 "Should have filtered out some events"
638 );
639
640 assert!(
642 filtered_events.verify(&network_identity),
643 "Multi-proof verification should pass"
644 );
645 }
646 _ => panic!("Expected FilteredEvents"),
647 }
648 });
649 }
650
651 #[test]
652 fn test_multiple_transactions_per_block() {
653 let executor = Runner::default();
654 executor.start(|context| async move {
655 let (network_secret, network_identity) = create_network_keypair();
657 let simulator = Simulator::new(network_identity);
658 let (mut state, mut events) = create_adbs(&context).await;
659
660 let accounts: Vec<_> = (0..5).map(create_account_keypair).collect();
662
663 let txs1: Vec<_> = accounts
665 .iter()
666 .map(|(private, _)| Transaction::sign(private, 0, Instruction::Generate))
667 .collect();
668
669 let (_, summary1) = execute_block(
670 &network_secret,
671 network_identity,
672 &mut state,
673 &mut events,
674 1, txs1.clone(),
676 )
677 .await;
678
679 let (state_digests1, events_digests1) = summary1
681 .verify(&network_identity)
682 .expect("Summary 1 verification failed");
683 simulator.submit_events(summary1.clone(), events_digests1);
684 simulator.submit_state(summary1.clone(), state_digests1);
685
686 assert_eq!(summary1.progress.height, 1);
688
689 for (_, public) in accounts.iter() {
691 let account_key = Sha256::hash(&Key::Account(public.clone()).encode());
692 let lookup = simulator.query_state(&account_key).unwrap();
693 assert!(lookup.verify(&network_identity));
694 let Variable::Update(_, Value::Account(account)) = lookup.operation else {
695 panic!("Account not found for {public:?}");
696 };
697 assert_eq!(account.nonce, 1);
698 assert!(account.creature.is_some());
699 }
700
701 let txs2: Vec<_> = accounts
703 .iter()
704 .take(3)
705 .map(|(private, _)| Transaction::sign(private, 1, Instruction::Generate))
706 .collect();
707
708 let (_, summary2) = execute_block(
709 &network_secret,
710 network_identity,
711 &mut state,
712 &mut events,
713 5, txs2,
715 )
716 .await;
717
718 let (state_digests2, events_digests2) = summary2
720 .verify(&network_identity)
721 .expect("Summary 2 verification failed");
722 simulator.submit_events(summary2.clone(), events_digests2);
723 simulator.submit_state(summary2.clone(), state_digests2);
724
725 assert_eq!(summary2.progress.height, 2);
727
728 for (i, (_, public)) in accounts.iter().enumerate() {
730 let account_key = Sha256::hash(&Key::Account(public.clone()).encode());
731 let lookup = simulator.query_state(&account_key).unwrap();
732 assert!(lookup.verify(&network_identity));
733 let Variable::Update(_, Value::Account(account)) = lookup.operation else {
734 panic!("Account not found for {public:?}");
735 };
736 let expected_nonce = if i < 3 { 2 } else { 1 };
738 assert_eq!(account.nonce, expected_nonce);
739 }
740 });
741 }
742}