1use alto_client::LATEST;
2use alto_types::{Block, Finalized, Kind, Notarized, Scheme, Seed};
3use axum::{
4 body::Bytes,
5 extract::{ws::WebSocketUpgrade, Path, State as AxumState},
6 http::StatusCode,
7 response::IntoResponse,
8 routing::{get, post},
9 Router,
10};
11use commonware_codec::{DecodeExt, Encode, EncodeSize, FixedSize, Write};
12use commonware_consensus::{types::View, Viewable};
13use commonware_cryptography::{sha256::Digest, Digestible};
14use commonware_parallel::Strategy;
15use commonware_utils::from_hex;
16use futures::{SinkExt, StreamExt};
17use std::{
18 collections::BTreeMap,
19 sync::{Arc, RwLock},
20};
21use tokio::sync::broadcast;
22use tower_http::cors::CorsLayer;
23
24#[derive(Default)]
25pub struct State {
26 seeds: BTreeMap<View, Seed>,
27 notarizations: BTreeMap<View, Notarized>,
28 finalizations: BTreeMap<View, Finalized>,
29 finalized_height_to_view: BTreeMap<u64, View>,
30 blocks_by_digest: BTreeMap<Digest, Block>,
31}
32
33#[derive(Clone)]
34pub struct Indexer<S: Strategy> {
35 scheme: Scheme,
36 state: Arc<RwLock<State>>,
37 consensus_tx: broadcast::Sender<Vec<u8>>,
38 strategy: S,
39}
40
41impl<S: Strategy> Indexer<S> {
42 pub fn new(scheme: Scheme, strategy: S) -> Self {
43 let (consensus_tx, _) = broadcast::channel(1024);
44 let state = Arc::new(RwLock::new(State::default()));
45
46 Self {
47 scheme,
48 state,
49 consensus_tx,
50 strategy,
51 }
52 }
53
54 pub fn submit_seed(&self, seed: Seed) -> Result<(), &'static str> {
55 if !seed.verify(&self.scheme) {
57 return Err("Invalid seed signature");
58 }
59
60 let mut state = self.state.write().unwrap();
61 if state.seeds.insert(seed.view(), seed.clone()).is_some() {
62 return Ok(()); }
64
65 let mut data = vec![0u8; u8::SIZE + seed.encode_size()];
67 data[0] = Kind::Seed as u8;
68 seed.write(&mut data[1..].as_mut());
69 let _ = self.consensus_tx.send(data);
70 Ok(())
71 }
72
73 pub fn get_seed(&self, query: &str) -> Option<Seed> {
74 let state = self.state.read().unwrap();
75 if query == LATEST {
76 state.seeds.last_key_value().map(|(_, seed)| seed.clone())
77 } else {
78 let raw = from_hex(query)?;
80 let index = u64::decode(raw.as_slice()).ok()?;
81 state.seeds.get(&View::new(index)).cloned()
82 }
83 }
84
85 pub fn submit_notarization(&self, notarized: Notarized) -> Result<(), &'static str> {
86 if !notarized.verify(&self.scheme, &self.strategy) {
88 return Err("Invalid notarization signature");
89 }
90
91 let mut state = self.state.write().unwrap();
92
93 state
95 .blocks_by_digest
96 .insert(notarized.block.digest(), notarized.block.clone());
97
98 let view = notarized.proof.view();
100 if state
101 .notarizations
102 .insert(view, notarized.clone())
103 .is_some()
104 {
105 return Ok(()); }
107
108 let mut data = vec![0u8; u8::SIZE + notarized.encode_size()];
110 data[0] = Kind::Notarization as u8;
111 notarized.write(&mut data[1..].as_mut());
112 let _ = self.consensus_tx.send(data);
113 Ok(())
114 }
115
116 pub fn get_notarization(&self, query: &str) -> Option<Notarized> {
117 let state = self.state.read().unwrap();
118 if query == LATEST {
119 state.notarizations.last_key_value().map(|(_, n)| n.clone())
120 } else {
121 let raw = from_hex(query)?;
123 let index = u64::decode(raw.as_slice()).ok()?;
124 state.notarizations.get(&View::new(index)).cloned()
125 }
126 }
127
128 pub fn submit_finalization(&self, finalized: Finalized) -> Result<(), &'static str> {
129 if !finalized.verify(&self.scheme, &self.strategy) {
131 return Err("Invalid finalization signature");
132 }
133
134 let mut state = self.state.write().unwrap();
135
136 state
138 .blocks_by_digest
139 .insert(finalized.block.digest(), finalized.block.clone());
140
141 let view = finalized.proof.view();
143 if state
144 .finalizations
145 .insert(view, finalized.clone())
146 .is_some()
147 {
148 return Ok(()); }
150 state
151 .finalized_height_to_view
152 .insert(finalized.block.height.get(), view);
153
154 let mut data = vec![0u8; u8::SIZE + finalized.encode_size()];
156 data[0] = Kind::Finalization as u8;
157 finalized.write(&mut data[1..].as_mut());
158 let _ = self.consensus_tx.send(data);
159 Ok(())
160 }
161
162 pub fn get_finalization(&self, query: &str) -> Option<Finalized> {
163 let state = self.state.read().unwrap();
164 if query == LATEST {
165 state.finalizations.last_key_value().map(|(_, f)| f.clone())
166 } else {
167 let raw = from_hex(query)?;
169 let index = u64::decode(raw.as_slice()).ok()?;
170 state.finalizations.get(&View::new(index)).cloned()
171 }
172 }
173
174 pub fn get_block(&self, query: &str) -> Option<BlockResult> {
175 let state = self.state.read().unwrap();
176
177 if query == LATEST {
178 state
180 .finalizations
181 .last_key_value()
182 .map(|(_, f)| BlockResult::Finalized(f.clone()))
183 } else if let Some(raw) = from_hex(query) {
184 if raw.len() == u64::SIZE {
186 let index = u64::decode(raw.as_slice()).ok()?;
187 state.finalized_height_to_view.get(&index).and_then(|view| {
188 state
189 .finalizations
190 .get(view)
191 .map(|f| BlockResult::Finalized(f.clone()))
192 })
193 } else if raw.len() == Digest::SIZE {
194 let digest = Digest::decode(raw.as_slice()).ok()?;
195 state
196 .blocks_by_digest
197 .get(&digest)
198 .map(|b| BlockResult::Block(b.clone()))
199 } else {
200 None
201 }
202 } else {
203 None
204 }
205 }
206
207 pub fn consensus_subscriber(&self) -> broadcast::Receiver<Vec<u8>> {
208 self.consensus_tx.subscribe()
209 }
210}
211
212#[allow(clippy::large_enum_variant)]
213pub enum BlockResult {
214 Block(Block),
215 Finalized(Finalized),
216}
217
218pub struct Api<S: Strategy> {
219 indexer: Arc<Indexer<S>>,
220}
221
222impl<S: Strategy> Api<S> {
223 pub fn new(indexer: Arc<Indexer<S>>) -> Self {
224 Self { indexer }
225 }
226
227 pub fn router(self) -> Router {
228 Router::new()
229 .route("/health", get(health_check))
230 .route("/seed", post(seed_upload))
231 .route("/seed/{query}", get(seed_get))
232 .route("/notarization", post(notarization_upload))
233 .route("/notarization/{query}", get(notarization_get))
234 .route("/finalization", post(finalization_upload))
235 .route("/finalization/{query}", get(finalization_get))
236 .route("/block/{query}", get(block_get))
237 .route("/consensus/ws", get(consensus_ws))
238 .layer(CorsLayer::permissive())
239 .with_state(self.indexer)
240 }
241}
242
243async fn health_check() -> impl IntoResponse {
244 (StatusCode::OK, "ok")
245}
246
247async fn seed_upload<S: Strategy>(
248 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
249 body: Bytes,
250) -> impl IntoResponse {
251 match Seed::decode(&mut body.as_ref()) {
252 Ok(seed) => match indexer.submit_seed(seed) {
253 Ok(_) => StatusCode::OK,
254 Err(_) => StatusCode::UNAUTHORIZED,
255 },
256 Err(_) => StatusCode::BAD_REQUEST,
257 }
258}
259
260async fn seed_get<S: Strategy>(
261 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
262 Path(query): Path<String>,
263) -> impl IntoResponse {
264 match indexer.get_seed(&query) {
265 Some(seed) => (StatusCode::OK, seed.encode().to_vec()).into_response(),
266 None => StatusCode::NOT_FOUND.into_response(),
267 }
268}
269
270async fn notarization_upload<S: Strategy>(
271 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
272 body: Bytes,
273) -> impl IntoResponse {
274 match Notarized::decode(&mut body.as_ref()) {
275 Ok(notarized) => match indexer.submit_notarization(notarized) {
276 Ok(_) => StatusCode::OK,
277 Err(_) => StatusCode::UNAUTHORIZED,
278 },
279 Err(_) => StatusCode::BAD_REQUEST,
280 }
281}
282
283async fn notarization_get<S: Strategy>(
284 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
285 Path(query): Path<String>,
286) -> impl IntoResponse {
287 match indexer.get_notarization(&query) {
288 Some(notarized) => (StatusCode::OK, notarized.encode().to_vec()).into_response(),
289 None => StatusCode::NOT_FOUND.into_response(),
290 }
291}
292
293async fn finalization_upload<S: Strategy>(
294 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
295 body: Bytes,
296) -> impl IntoResponse {
297 match Finalized::decode(&mut body.as_ref()) {
298 Ok(finalized) => match indexer.submit_finalization(finalized) {
299 Ok(_) => StatusCode::OK,
300 Err(_) => StatusCode::UNAUTHORIZED,
301 },
302 Err(_) => StatusCode::BAD_REQUEST,
303 }
304}
305
306async fn finalization_get<S: Strategy>(
307 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
308 Path(query): Path<String>,
309) -> impl IntoResponse {
310 match indexer.get_finalization(&query) {
311 Some(finalized) => (StatusCode::OK, finalized.encode().to_vec()).into_response(),
312 None => StatusCode::NOT_FOUND.into_response(),
313 }
314}
315
316async fn block_get<S: Strategy>(
317 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
318 Path(query): Path<String>,
319) -> impl IntoResponse {
320 match indexer.get_block(&query) {
321 Some(BlockResult::Block(block)) => {
322 (StatusCode::OK, block.encode().to_vec()).into_response()
323 }
324 Some(BlockResult::Finalized(finalized)) => {
325 (StatusCode::OK, finalized.encode().to_vec()).into_response()
326 }
327 None => StatusCode::NOT_FOUND.into_response(),
328 }
329}
330
331async fn consensus_ws<S: Strategy>(
332 AxumState(indexer): AxumState<Arc<Indexer<S>>>,
333 ws: WebSocketUpgrade,
334) -> impl IntoResponse {
335 ws.on_upgrade(move |socket| handle_consensus_ws(socket, indexer))
336}
337
338async fn handle_consensus_ws<S: Strategy>(
339 socket: axum::extract::ws::WebSocket,
340 indexer: Arc<Indexer<S>>,
341) {
342 let (mut sender, _receiver) = socket.split();
343 let mut consensus = indexer.consensus_subscriber();
344
345 while let Ok(data) = consensus.recv().await {
346 if sender
347 .send(axum::extract::ws::Message::Binary(data.into()))
348 .await
349 .is_err()
350 {
351 break;
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use alto_client::{Client, ClientBuilder, IndexQuery, Query};
360 use alto_types::{Context, Identity, Seedable, EPOCH, NAMESPACE};
361 use commonware_consensus::{
362 simplex::{
363 scheme::bls12381_threshold::vrf as bls12381_threshold,
364 types::{Finalization, Finalize, Notarization, Notarize, Proposal},
365 },
366 types::{Height, Round, View},
367 Viewable,
368 };
369 use commonware_cryptography::{
370 bls12381::primitives::variant::MinSig, certificate::mocks::Fixture, ed25519, sha256,
371 Digest, Digestible, Hasher, Sha256, Signer,
372 };
373 use commonware_parallel::Sequential;
374 use futures::StreamExt;
375 use rand::{rngs::StdRng, SeedableRng};
376 use rcgen::{generate_simple_self_signed, CertifiedKey, KeyPair};
377 use rustls::pki_types::{CertificateDer, PrivateKeyDer};
378 use std::net::SocketAddr;
379 use tokio::net::TcpListener;
380 use tokio_rustls::TlsAcceptor;
381 use tower::ServiceExt;
382
383 struct TestContext {
385 schemes: Vec<Scheme>,
386 client: Client<Sequential>,
387 }
388
389 impl TestContext {
390 async fn new() -> Self {
392 let mut rng = StdRng::seed_from_u64(0);
393 let Fixture { schemes, .. } =
394 bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
395 let identity = *schemes[0].polynomial().public();
396
397 let (addr, _) = start_server(schemes[0].clone(), Sequential).await;
398 let client = Client::new(&format!("http://{addr}"), identity, Sequential);
399 wait_for_ready(&client).await;
400
401 Self { schemes, client }
402 }
403
404 fn test_block(&self) -> Block {
406 let context = Context {
407 round: Round::new(EPOCH, View::new(1)),
408 leader: ed25519::PrivateKey::from_seed(0).public_key(),
409 parent: (View::new(0), sha256::Digest::EMPTY),
410 };
411 Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000)
412 }
413
414 fn proposal(&self, block: &Block) -> Proposal<sha256::Digest> {
416 Proposal::new(
417 Round::new(EPOCH, View::new(1)),
418 View::new(0),
419 block.digest(),
420 )
421 }
422
423 fn seed(&self) -> Seed {
425 let block = self.test_block();
426 let proposal = self.proposal(&block);
427 create_notarization(&self.schemes, proposal).seed()
428 }
429
430 fn notarized(&self) -> Notarized {
432 let block = self.test_block();
433 let proposal = self.proposal(&block);
434 Notarized::new(create_notarization(&self.schemes, proposal), block)
435 }
436
437 fn finalized(&self) -> Finalized {
439 let block = self.test_block();
440 let proposal = self.proposal(&block);
441 Finalized::new(create_finalization(&self.schemes, proposal), block)
442 }
443 }
444
445 fn create_notarization(
446 schemes: &[Scheme],
447 proposal: Proposal<sha256::Digest>,
448 ) -> alto_types::Notarization {
449 let notarizes: Vec<_> = schemes
450 .iter()
451 .map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
452 .collect();
453 Notarization::from_notarizes(&schemes[0], ¬arizes, &Sequential).unwrap()
454 }
455
456 fn create_finalization(
457 schemes: &[Scheme],
458 proposal: Proposal<sha256::Digest>,
459 ) -> alto_types::Finalization {
460 let finalizes: Vec<_> = schemes
461 .iter()
462 .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
463 .collect();
464 Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
465 }
466
467 async fn start_server(
468 scheme: Scheme,
469 strategy: impl Strategy,
470 ) -> (SocketAddr, tokio::task::JoinHandle<()>) {
471 let indexer = Arc::new(Indexer::new(scheme, strategy));
472 let api = Api::new(indexer);
473 let app = api.router();
474
475 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
476 let addr = listener.local_addr().unwrap();
477
478 let handle = tokio::spawn(async move {
479 axum::serve(listener, app).await.unwrap();
480 });
481
482 (addr, handle)
483 }
484
485 async fn wait_for_ready(client: &Client<Sequential>) {
486 loop {
487 if client.health().await.is_ok() {
488 return;
489 }
490 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
491 }
492 }
493
494 fn fixture(seed: u64) -> (Vec<Scheme>, Identity) {
495 let mut rng = StdRng::seed_from_u64(seed);
496 let Fixture { schemes, .. } =
497 bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
498 let identity = *schemes[0].polynomial().public();
499 (schemes, identity)
500 }
501
502 #[tokio::test]
503 async fn test_seed_operations() {
504 let ctx = TestContext::new().await;
505 let seed = ctx.seed();
506
507 ctx.client.seed_upload(seed.clone()).await.unwrap();
508
509 let retrieved = ctx.client.seed_get(IndexQuery::Latest).await.unwrap();
510 assert_eq!(retrieved.view(), seed.view());
511
512 let retrieved = ctx.client.seed_get(IndexQuery::Index(1)).await.unwrap();
513 assert_eq!(retrieved.view().get(), 1);
514 }
515
516 #[tokio::test]
517 async fn test_notarization_operations() {
518 let ctx = TestContext::new().await;
519 let notarized = ctx.notarized();
520
521 ctx.client.notarized_upload(notarized).await.unwrap();
522
523 let retrieved = ctx.client.notarized_get(IndexQuery::Latest).await.unwrap();
524 assert_eq!(retrieved.proof.view().get(), 1);
525
526 let retrieved = ctx
527 .client
528 .notarized_get(IndexQuery::Index(1))
529 .await
530 .unwrap();
531 assert_eq!(retrieved.proof.view().get(), 1);
532 }
533
534 #[tokio::test]
535 async fn test_finalization_operations() {
536 let ctx = TestContext::new().await;
537 let finalized = ctx.finalized();
538
539 ctx.client.finalized_upload(finalized).await.unwrap();
540
541 let retrieved = ctx.client.finalized_get(IndexQuery::Latest).await.unwrap();
542 assert_eq!(retrieved.proof.view().get(), 1);
543
544 let retrieved = ctx
545 .client
546 .finalized_get(IndexQuery::Index(1))
547 .await
548 .unwrap();
549 assert_eq!(retrieved.proof.view().get(), 1);
550 }
551
552 #[tokio::test]
553 async fn test_block_retrieval() {
554 let ctx = TestContext::new().await;
555 let block = ctx.test_block();
556 let finalized = ctx.finalized();
557
558 ctx.client.finalized_upload(finalized).await.unwrap();
559
560 let payload = ctx.client.block_get(Query::Latest).await.unwrap();
562 match payload {
563 alto_client::consensus::Payload::Finalized(f) => {
564 assert_eq!(f.block.height.get(), 1);
565 }
566 _ => panic!("Expected finalized block"),
567 }
568
569 let payload = ctx.client.block_get(Query::Index(1)).await.unwrap();
571 match payload {
572 alto_client::consensus::Payload::Finalized(f) => {
573 assert_eq!(f.block.height.get(), 1);
574 }
575 _ => panic!("Expected finalized block"),
576 }
577
578 let payload = ctx
580 .client
581 .block_get(Query::Digest(block.digest()))
582 .await
583 .unwrap();
584 match payload {
585 alto_client::consensus::Payload::Block(b) => {
586 assert_eq!(b.digest(), block.digest());
587 }
588 _ => panic!("Expected block"),
589 }
590 }
591
592 #[tokio::test]
593 async fn test_websocket_streaming() {
594 let ctx = TestContext::new().await;
595 let seed = ctx.seed();
596
597 let mut stream = ctx.client.listen().await.unwrap();
598
599 let (tx, rx) = tokio::sync::oneshot::channel();
601 let client = ctx.client.clone();
602 tokio::spawn(async move {
603 rx.await.unwrap();
604 client.seed_upload(seed).await.unwrap();
605 });
606
607 tx.send(()).unwrap();
609 if let Some(Ok(msg)) = stream.next().await {
610 match msg {
611 alto_client::consensus::Message::Seed(s) => {
612 assert_eq!(s.view().get(), 1);
613 }
614 _ => panic!("Expected seed message"),
615 }
616 } else {
617 panic!("Expected to receive a message");
618 }
619 }
620
621 #[tokio::test]
622 async fn test_identity_verification() {
623 let (schemes1, _) = fixture(0);
625 let (_, identity2) = fixture(1);
626
627 let (addr, _handle) = start_server(schemes1[0].clone(), Sequential).await;
629 let client = Client::new(&format!("http://{addr}"), identity2, Sequential);
630 wait_for_ready(&client).await;
631
632 let context = Context {
634 round: Round::new(EPOCH, View::new(1)),
635 leader: ed25519::PrivateKey::from_seed(0).public_key(),
636 parent: (View::new(0), sha256::Digest::EMPTY),
637 };
638 let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
639 let proposal = Proposal::new(
640 Round::new(EPOCH, View::new(1)),
641 View::new(0),
642 block.digest(),
643 );
644 let seed = create_notarization(&schemes1, proposal).seed();
645
646 client.seed_upload(seed).await.unwrap();
648
649 let result = client.seed_get(IndexQuery::Latest).await;
651 assert!(result.is_err());
652 }
653
654 #[tokio::test]
655 async fn test_invalid_signature_rejection() {
656 let ctx = TestContext::new().await;
657
658 let (wrong_schemes, _) = fixture(1);
660
661 let block = ctx.test_block();
663 let proposal = ctx.proposal(&block);
664 let bad_seed = create_notarization(&wrong_schemes, proposal).seed();
665
666 let result = ctx.client.seed_upload(bad_seed).await;
668 assert!(result.is_err());
669 }
670
671 fn generate_self_signed_cert() -> CertifiedKey<KeyPair> {
672 let subject_alt_names = vec!["localhost".to_string(), "127.0.0.1".to_string()];
673 generate_simple_self_signed(subject_alt_names).unwrap()
674 }
675
676 async fn start_tls_server(
677 scheme: Scheme,
678 cert_key: &CertifiedKey<KeyPair>,
679 strategy: impl Strategy,
680 ) -> (SocketAddr, tokio::task::JoinHandle<()>) {
681 let indexer = Arc::new(Indexer::new(scheme, strategy));
682 let api = Api::new(indexer);
683 let app = api.router();
684
685 let cert_der = CertificateDer::from(cert_key.cert.der().to_vec());
687 let key_der = PrivateKeyDer::try_from(cert_key.signing_key.serialize_der()).unwrap();
688
689 let server_config = rustls::ServerConfig::builder_with_provider(Arc::new(
690 rustls::crypto::aws_lc_rs::default_provider(),
691 ))
692 .with_safe_default_protocol_versions()
693 .unwrap()
694 .with_no_client_auth()
695 .with_single_cert(vec![cert_der], key_der)
696 .expect("Failed to create server config");
697 let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
698
699 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
700 let addr = listener.local_addr().unwrap();
701
702 let handle = tokio::spawn(async move {
703 loop {
704 let (stream, _) = listener.accept().await.unwrap();
705 let tls_acceptor = tls_acceptor.clone();
706 let app = app.clone();
707
708 tokio::spawn(async move {
709 let tls_stream = match tls_acceptor.accept(stream).await {
710 Ok(s) => s,
711 Err(_) => return,
712 };
713
714 let io = hyper_util::rt::TokioIo::new(tls_stream);
715 let service = hyper::service::service_fn(move |req| {
716 let app = app.clone();
717 async move { app.oneshot(req).await }
718 });
719 let _ = hyper_util::server::conn::auto::Builder::new(
720 hyper_util::rt::TokioExecutor::new(),
721 )
722 .serve_connection_with_upgrades(io, service)
723 .await;
724 });
725 }
726 });
727
728 (addr, handle)
729 }
730
731 fn create_tls_client(
732 addr: SocketAddr,
733 identity: Identity,
734 cert_key: &CertifiedKey<KeyPair>,
735 ) -> Client<Sequential> {
736 ClientBuilder::new(&format!("https://{addr}"), identity, Sequential)
737 .with_tls_cert(cert_key.cert.der().to_vec())
738 .build()
739 }
740
741 #[tokio::test]
742 async fn test_tls_https_connection() {
743 let cert_key = generate_self_signed_cert();
744
745 let mut rng = StdRng::seed_from_u64(0);
746 let Fixture { schemes, .. } =
747 bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
748 let identity = *schemes[0].polynomial().public();
749
750 let (addr, handle) = start_tls_server(schemes[0].clone(), &cert_key, Sequential).await;
751 let client = create_tls_client(addr, identity, &cert_key);
752 wait_for_ready(&client).await;
753
754 let context = Context {
756 round: Round::new(EPOCH, View::new(1)),
757 leader: ed25519::PrivateKey::from_seed(0).public_key(),
758 parent: (View::new(0), sha256::Digest::EMPTY),
759 };
760 let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
761 let proposal = Proposal::new(
762 Round::new(EPOCH, View::new(1)),
763 View::new(0),
764 block.digest(),
765 );
766 let seed = create_notarization(&schemes, proposal).seed();
767
768 client.seed_upload(seed.clone()).await.unwrap();
770
771 let retrieved = client.seed_get(IndexQuery::Latest).await.unwrap();
773 assert_eq!(retrieved.view(), seed.view());
774
775 handle.abort();
776 }
777
778 #[tokio::test]
779 async fn test_tls_websocket_connection() {
780 let cert_key = generate_self_signed_cert();
781
782 let mut rng = StdRng::seed_from_u64(0);
783 let Fixture { schemes, .. } =
784 bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
785 let identity = *schemes[0].polynomial().public();
786
787 let (addr, handle) = start_tls_server(schemes[0].clone(), &cert_key, Sequential).await;
788 let client = create_tls_client(addr, identity, &cert_key);
789 wait_for_ready(&client).await;
790
791 let context = Context {
793 round: Round::new(EPOCH, View::new(1)),
794 leader: ed25519::PrivateKey::from_seed(0).public_key(),
795 parent: (View::new(0), sha256::Digest::EMPTY),
796 };
797 let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
798 let proposal = Proposal::new(
799 Round::new(EPOCH, View::new(1)),
800 View::new(0),
801 block.digest(),
802 );
803 let seed = create_notarization(&schemes, proposal).seed();
804
805 let mut stream = client.listen().await.unwrap();
807
808 let (tx, rx) = tokio::sync::oneshot::channel();
810 let upload_client = client.clone();
811 tokio::spawn(async move {
812 rx.await.unwrap();
813 upload_client.seed_upload(seed).await.unwrap();
814 });
815
816 tx.send(()).unwrap();
818 if let Some(Ok(msg)) = stream.next().await {
819 match msg {
820 alto_client::consensus::Message::Seed(s) => {
821 assert_eq!(s.view().get(), 1);
822 }
823 _ => panic!("Expected seed message"),
824 }
825 } else {
826 panic!("Expected to receive a message");
827 }
828
829 handle.abort();
830 }
831}