Skip to main content

alto_indexer/
lib.rs

1use alto_client::LATEST;
2use alto_types::{Block, Finalized, Kind, Notarized, Scheme, Seed};
3use axum::{
4    body::Bytes,
5    extract::{ws::WebSocketUpgrade, Path, State as AxumState},
6    http::StatusCode,
7    response::IntoResponse,
8    routing::{get, post},
9    Router,
10};
11use commonware_codec::{DecodeExt, Encode, EncodeSize, FixedSize, Write};
12use commonware_consensus::{types::View, Viewable};
13use commonware_cryptography::{sha256::Digest, Digestible};
14use commonware_parallel::Strategy;
15use commonware_utils::from_hex;
16use futures::{SinkExt, StreamExt};
17use std::{
18    collections::BTreeMap,
19    sync::{Arc, RwLock},
20};
21use tokio::sync::broadcast;
22use tower_http::cors::CorsLayer;
23
24#[derive(Default)]
25pub struct State {
26    seeds: BTreeMap<View, Seed>,
27    notarizations: BTreeMap<View, Notarized>,
28    finalizations: BTreeMap<View, Finalized>,
29    finalized_height_to_view: BTreeMap<u64, View>,
30    blocks_by_digest: BTreeMap<Digest, Block>,
31}
32
33#[derive(Clone)]
34pub struct Indexer<S: Strategy> {
35    scheme: Scheme,
36    state: Arc<RwLock<State>>,
37    consensus_tx: broadcast::Sender<Vec<u8>>,
38    strategy: S,
39}
40
41impl<S: Strategy> Indexer<S> {
42    pub fn new(scheme: Scheme, strategy: S) -> Self {
43        let (consensus_tx, _) = broadcast::channel(1024);
44        let state = Arc::new(RwLock::new(State::default()));
45
46        Self {
47            scheme,
48            state,
49            consensus_tx,
50            strategy,
51        }
52    }
53
54    pub fn submit_seed(&self, seed: Seed) -> Result<(), &'static str> {
55        // Verify signature with identity
56        if !seed.verify(&self.scheme) {
57            return Err("Invalid seed signature");
58        }
59
60        let mut state = self.state.write().unwrap();
61        if state.seeds.insert(seed.view(), seed.clone()).is_some() {
62            return Ok(()); // Already exists
63        }
64
65        // Broadcast seed
66        let mut data = vec![0u8; u8::SIZE + seed.encode_size()];
67        data[0] = Kind::Seed as u8;
68        seed.write(&mut data[1..].as_mut());
69        let _ = self.consensus_tx.send(data);
70        Ok(())
71    }
72
73    pub fn get_seed(&self, query: &str) -> Option<Seed> {
74        let state = self.state.read().unwrap();
75        if query == LATEST {
76            state.seeds.last_key_value().map(|(_, seed)| seed.clone())
77        } else {
78            // Parse as hex-encoded index
79            let raw = from_hex(query)?;
80            let index = u64::decode(raw.as_slice()).ok()?;
81            state.seeds.get(&View::new(index)).cloned()
82        }
83    }
84
85    pub fn submit_notarization(&self, notarized: Notarized) -> Result<(), &'static str> {
86        // Verify signature with identity
87        if !notarized.verify(&self.scheme, &self.strategy) {
88            return Err("Invalid notarization signature");
89        }
90
91        let mut state = self.state.write().unwrap();
92
93        // Store block by digest
94        state
95            .blocks_by_digest
96            .insert(notarized.block.digest(), notarized.block.clone());
97
98        // Store notarization
99        let view = notarized.proof.view();
100        if state
101            .notarizations
102            .insert(view, notarized.clone())
103            .is_some()
104        {
105            return Ok(()); // Already exists
106        }
107
108        // Broadcast notarization
109        let mut data = vec![0u8; u8::SIZE + notarized.encode_size()];
110        data[0] = Kind::Notarization as u8;
111        notarized.write(&mut data[1..].as_mut());
112        let _ = self.consensus_tx.send(data);
113        Ok(())
114    }
115
116    pub fn get_notarization(&self, query: &str) -> Option<Notarized> {
117        let state = self.state.read().unwrap();
118        if query == LATEST {
119            state.notarizations.last_key_value().map(|(_, n)| n.clone())
120        } else {
121            // Parse as hex-encoded index
122            let raw = from_hex(query)?;
123            let index = u64::decode(raw.as_slice()).ok()?;
124            state.notarizations.get(&View::new(index)).cloned()
125        }
126    }
127
128    pub fn submit_finalization(&self, finalized: Finalized) -> Result<(), &'static str> {
129        // Verify signature with identity
130        if !finalized.verify(&self.scheme, &self.strategy) {
131            return Err("Invalid finalization signature");
132        }
133
134        let mut state = self.state.write().unwrap();
135
136        // Store block by digest
137        state
138            .blocks_by_digest
139            .insert(finalized.block.digest(), finalized.block.clone());
140
141        // Store finalization
142        let view = finalized.proof.view();
143        if state
144            .finalizations
145            .insert(view, finalized.clone())
146            .is_some()
147        {
148            return Ok(()); // Already exists
149        }
150        state
151            .finalized_height_to_view
152            .insert(finalized.block.height.get(), view);
153
154        // Broadcast finalization
155        let mut data = vec![0u8; u8::SIZE + finalized.encode_size()];
156        data[0] = Kind::Finalization as u8;
157        finalized.write(&mut data[1..].as_mut());
158        let _ = self.consensus_tx.send(data);
159        Ok(())
160    }
161
162    pub fn get_finalization(&self, query: &str) -> Option<Finalized> {
163        let state = self.state.read().unwrap();
164        if query == LATEST {
165            state.finalizations.last_key_value().map(|(_, f)| f.clone())
166        } else {
167            // Parse as hex-encoded index
168            let raw = from_hex(query)?;
169            let index = u64::decode(raw.as_slice()).ok()?;
170            state.finalizations.get(&View::new(index)).cloned()
171        }
172    }
173
174    pub fn get_block(&self, query: &str) -> Option<BlockResult> {
175        let state = self.state.read().unwrap();
176
177        if query == LATEST {
178            // Return latest finalized block
179            state
180                .finalizations
181                .last_key_value()
182                .map(|(_, f)| BlockResult::Finalized(f.clone()))
183        } else if let Some(raw) = from_hex(query) {
184            // Try to parse as index (8 bytes)
185            if raw.len() == u64::SIZE {
186                let index = u64::decode(raw.as_slice()).ok()?;
187                state.finalized_height_to_view.get(&index).and_then(|view| {
188                    state
189                        .finalizations
190                        .get(view)
191                        .map(|f| BlockResult::Finalized(f.clone()))
192                })
193            } else if raw.len() == Digest::SIZE {
194                let digest = Digest::decode(raw.as_slice()).ok()?;
195                state
196                    .blocks_by_digest
197                    .get(&digest)
198                    .map(|b| BlockResult::Block(b.clone()))
199            } else {
200                None
201            }
202        } else {
203            None
204        }
205    }
206
207    pub fn consensus_subscriber(&self) -> broadcast::Receiver<Vec<u8>> {
208        self.consensus_tx.subscribe()
209    }
210}
211
212#[allow(clippy::large_enum_variant)]
213pub enum BlockResult {
214    Block(Block),
215    Finalized(Finalized),
216}
217
218pub struct Api<S: Strategy> {
219    indexer: Arc<Indexer<S>>,
220}
221
222impl<S: Strategy> Api<S> {
223    pub fn new(indexer: Arc<Indexer<S>>) -> Self {
224        Self { indexer }
225    }
226
227    pub fn router(self) -> Router {
228        Router::new()
229            .route("/health", get(health_check))
230            .route("/seed", post(seed_upload))
231            .route("/seed/{query}", get(seed_get))
232            .route("/notarization", post(notarization_upload))
233            .route("/notarization/{query}", get(notarization_get))
234            .route("/finalization", post(finalization_upload))
235            .route("/finalization/{query}", get(finalization_get))
236            .route("/block/{query}", get(block_get))
237            .route("/consensus/ws", get(consensus_ws))
238            .layer(CorsLayer::permissive())
239            .with_state(self.indexer)
240    }
241}
242
243async fn health_check() -> impl IntoResponse {
244    (StatusCode::OK, "ok")
245}
246
247async fn seed_upload<S: Strategy>(
248    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
249    body: Bytes,
250) -> impl IntoResponse {
251    match Seed::decode(&mut body.as_ref()) {
252        Ok(seed) => match indexer.submit_seed(seed) {
253            Ok(_) => StatusCode::OK,
254            Err(_) => StatusCode::UNAUTHORIZED,
255        },
256        Err(_) => StatusCode::BAD_REQUEST,
257    }
258}
259
260async fn seed_get<S: Strategy>(
261    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
262    Path(query): Path<String>,
263) -> impl IntoResponse {
264    match indexer.get_seed(&query) {
265        Some(seed) => (StatusCode::OK, seed.encode().to_vec()).into_response(),
266        None => StatusCode::NOT_FOUND.into_response(),
267    }
268}
269
270async fn notarization_upload<S: Strategy>(
271    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
272    body: Bytes,
273) -> impl IntoResponse {
274    match Notarized::decode(&mut body.as_ref()) {
275        Ok(notarized) => match indexer.submit_notarization(notarized) {
276            Ok(_) => StatusCode::OK,
277            Err(_) => StatusCode::UNAUTHORIZED,
278        },
279        Err(_) => StatusCode::BAD_REQUEST,
280    }
281}
282
283async fn notarization_get<S: Strategy>(
284    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
285    Path(query): Path<String>,
286) -> impl IntoResponse {
287    match indexer.get_notarization(&query) {
288        Some(notarized) => (StatusCode::OK, notarized.encode().to_vec()).into_response(),
289        None => StatusCode::NOT_FOUND.into_response(),
290    }
291}
292
293async fn finalization_upload<S: Strategy>(
294    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
295    body: Bytes,
296) -> impl IntoResponse {
297    match Finalized::decode(&mut body.as_ref()) {
298        Ok(finalized) => match indexer.submit_finalization(finalized) {
299            Ok(_) => StatusCode::OK,
300            Err(_) => StatusCode::UNAUTHORIZED,
301        },
302        Err(_) => StatusCode::BAD_REQUEST,
303    }
304}
305
306async fn finalization_get<S: Strategy>(
307    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
308    Path(query): Path<String>,
309) -> impl IntoResponse {
310    match indexer.get_finalization(&query) {
311        Some(finalized) => (StatusCode::OK, finalized.encode().to_vec()).into_response(),
312        None => StatusCode::NOT_FOUND.into_response(),
313    }
314}
315
316async fn block_get<S: Strategy>(
317    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
318    Path(query): Path<String>,
319) -> impl IntoResponse {
320    match indexer.get_block(&query) {
321        Some(BlockResult::Block(block)) => {
322            (StatusCode::OK, block.encode().to_vec()).into_response()
323        }
324        Some(BlockResult::Finalized(finalized)) => {
325            (StatusCode::OK, finalized.encode().to_vec()).into_response()
326        }
327        None => StatusCode::NOT_FOUND.into_response(),
328    }
329}
330
331async fn consensus_ws<S: Strategy>(
332    AxumState(indexer): AxumState<Arc<Indexer<S>>>,
333    ws: WebSocketUpgrade,
334) -> impl IntoResponse {
335    ws.on_upgrade(move |socket| handle_consensus_ws(socket, indexer))
336}
337
338async fn handle_consensus_ws<S: Strategy>(
339    socket: axum::extract::ws::WebSocket,
340    indexer: Arc<Indexer<S>>,
341) {
342    let (mut sender, _receiver) = socket.split();
343    let mut consensus = indexer.consensus_subscriber();
344
345    while let Ok(data) = consensus.recv().await {
346        if sender
347            .send(axum::extract::ws::Message::Binary(data.into()))
348            .await
349            .is_err()
350        {
351            break;
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use alto_client::{Client, ClientBuilder, IndexQuery, Query};
360    use alto_types::{Context, Identity, Seedable, EPOCH, NAMESPACE};
361    use commonware_consensus::{
362        simplex::{
363            scheme::bls12381_threshold::vrf as bls12381_threshold,
364            types::{Finalization, Finalize, Notarization, Notarize, Proposal},
365        },
366        types::{Height, Round, View},
367        Viewable,
368    };
369    use commonware_cryptography::{
370        bls12381::primitives::variant::MinSig, certificate::mocks::Fixture, ed25519, sha256,
371        Digest, Digestible, Hasher, Sha256, Signer,
372    };
373    use commonware_parallel::Sequential;
374    use futures::StreamExt;
375    use rand::{rngs::StdRng, SeedableRng};
376    use rcgen::{generate_simple_self_signed, CertifiedKey, KeyPair};
377    use rustls::pki_types::{CertificateDer, PrivateKeyDer};
378    use std::net::SocketAddr;
379    use tokio::net::TcpListener;
380    use tokio_rustls::TlsAcceptor;
381    use tower::ServiceExt;
382
383    /// Test context containing common setup for indexer tests.
384    struct TestContext {
385        schemes: Vec<Scheme>,
386        client: Client<Sequential>,
387    }
388
389    impl TestContext {
390        /// Create a new test context with a running server and client.
391        async fn new() -> Self {
392            let mut rng = StdRng::seed_from_u64(0);
393            let Fixture { schemes, .. } =
394                bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
395            let identity = *schemes[0].polynomial().public();
396
397            let (addr, _) = start_server(schemes[0].clone(), Sequential).await;
398            let client = Client::new(&format!("http://{addr}"), identity, Sequential);
399            wait_for_ready(&client).await;
400
401            Self { schemes, client }
402        }
403
404        /// Create a test block with standard parameters.
405        fn test_block(&self) -> Block {
406            let context = Context {
407                round: Round::new(EPOCH, View::new(1)),
408                leader: ed25519::PrivateKey::from_seed(0).public_key(),
409                parent: (View::new(0), sha256::Digest::EMPTY),
410            };
411            Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000)
412        }
413
414        /// Create a proposal for the given block at view 1.
415        fn proposal(&self, block: &Block) -> Proposal<sha256::Digest> {
416            Proposal::new(
417                Round::new(EPOCH, View::new(1)),
418                View::new(0),
419                block.digest(),
420            )
421        }
422
423        /// Create a seed by first creating a notarization.
424        fn seed(&self) -> Seed {
425            let block = self.test_block();
426            let proposal = self.proposal(&block);
427            create_notarization(&self.schemes, proposal).seed()
428        }
429
430        /// Create a notarized block.
431        fn notarized(&self) -> Notarized {
432            let block = self.test_block();
433            let proposal = self.proposal(&block);
434            Notarized::new(create_notarization(&self.schemes, proposal), block)
435        }
436
437        /// Create a finalized block.
438        fn finalized(&self) -> Finalized {
439            let block = self.test_block();
440            let proposal = self.proposal(&block);
441            Finalized::new(create_finalization(&self.schemes, proposal), block)
442        }
443    }
444
445    fn create_notarization(
446        schemes: &[Scheme],
447        proposal: Proposal<sha256::Digest>,
448    ) -> alto_types::Notarization {
449        let notarizes: Vec<_> = schemes
450            .iter()
451            .map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
452            .collect();
453        Notarization::from_notarizes(&schemes[0], &notarizes, &Sequential).unwrap()
454    }
455
456    fn create_finalization(
457        schemes: &[Scheme],
458        proposal: Proposal<sha256::Digest>,
459    ) -> alto_types::Finalization {
460        let finalizes: Vec<_> = schemes
461            .iter()
462            .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
463            .collect();
464        Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
465    }
466
467    async fn start_server(
468        scheme: Scheme,
469        strategy: impl Strategy,
470    ) -> (SocketAddr, tokio::task::JoinHandle<()>) {
471        let indexer = Arc::new(Indexer::new(scheme, strategy));
472        let api = Api::new(indexer);
473        let app = api.router();
474
475        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
476        let addr = listener.local_addr().unwrap();
477
478        let handle = tokio::spawn(async move {
479            axum::serve(listener, app).await.unwrap();
480        });
481
482        (addr, handle)
483    }
484
485    async fn wait_for_ready(client: &Client<Sequential>) {
486        loop {
487            if client.health().await.is_ok() {
488                return;
489            }
490            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
491        }
492    }
493
494    fn fixture(seed: u64) -> (Vec<Scheme>, Identity) {
495        let mut rng = StdRng::seed_from_u64(seed);
496        let Fixture { schemes, .. } =
497            bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
498        let identity = *schemes[0].polynomial().public();
499        (schemes, identity)
500    }
501
502    #[tokio::test]
503    async fn test_seed_operations() {
504        let ctx = TestContext::new().await;
505        let seed = ctx.seed();
506
507        ctx.client.seed_upload(seed.clone()).await.unwrap();
508
509        let retrieved = ctx.client.seed_get(IndexQuery::Latest).await.unwrap();
510        assert_eq!(retrieved.view(), seed.view());
511
512        let retrieved = ctx.client.seed_get(IndexQuery::Index(1)).await.unwrap();
513        assert_eq!(retrieved.view().get(), 1);
514    }
515
516    #[tokio::test]
517    async fn test_notarization_operations() {
518        let ctx = TestContext::new().await;
519        let notarized = ctx.notarized();
520
521        ctx.client.notarized_upload(notarized).await.unwrap();
522
523        let retrieved = ctx.client.notarized_get(IndexQuery::Latest).await.unwrap();
524        assert_eq!(retrieved.proof.view().get(), 1);
525
526        let retrieved = ctx
527            .client
528            .notarized_get(IndexQuery::Index(1))
529            .await
530            .unwrap();
531        assert_eq!(retrieved.proof.view().get(), 1);
532    }
533
534    #[tokio::test]
535    async fn test_finalization_operations() {
536        let ctx = TestContext::new().await;
537        let finalized = ctx.finalized();
538
539        ctx.client.finalized_upload(finalized).await.unwrap();
540
541        let retrieved = ctx.client.finalized_get(IndexQuery::Latest).await.unwrap();
542        assert_eq!(retrieved.proof.view().get(), 1);
543
544        let retrieved = ctx
545            .client
546            .finalized_get(IndexQuery::Index(1))
547            .await
548            .unwrap();
549        assert_eq!(retrieved.proof.view().get(), 1);
550    }
551
552    #[tokio::test]
553    async fn test_block_retrieval() {
554        let ctx = TestContext::new().await;
555        let block = ctx.test_block();
556        let finalized = ctx.finalized();
557
558        ctx.client.finalized_upload(finalized).await.unwrap();
559
560        // Test retrieval by latest
561        let payload = ctx.client.block_get(Query::Latest).await.unwrap();
562        match payload {
563            alto_client::consensus::Payload::Finalized(f) => {
564                assert_eq!(f.block.height.get(), 1);
565            }
566            _ => panic!("Expected finalized block"),
567        }
568
569        // Test retrieval by index
570        let payload = ctx.client.block_get(Query::Index(1)).await.unwrap();
571        match payload {
572            alto_client::consensus::Payload::Finalized(f) => {
573                assert_eq!(f.block.height.get(), 1);
574            }
575            _ => panic!("Expected finalized block"),
576        }
577
578        // Test retrieval by digest
579        let payload = ctx
580            .client
581            .block_get(Query::Digest(block.digest()))
582            .await
583            .unwrap();
584        match payload {
585            alto_client::consensus::Payload::Block(b) => {
586                assert_eq!(b.digest(), block.digest());
587            }
588            _ => panic!("Expected block"),
589        }
590    }
591
592    #[tokio::test]
593    async fn test_websocket_streaming() {
594        let ctx = TestContext::new().await;
595        let seed = ctx.seed();
596
597        let mut stream = ctx.client.listen().await.unwrap();
598
599        // Signal that websocket is connected, then upload the seed
600        let (tx, rx) = tokio::sync::oneshot::channel();
601        let client = ctx.client.clone();
602        tokio::spawn(async move {
603            rx.await.unwrap();
604            client.seed_upload(seed).await.unwrap();
605        });
606
607        // Signal ready and wait for the seed message
608        tx.send(()).unwrap();
609        if let Some(Ok(msg)) = stream.next().await {
610            match msg {
611                alto_client::consensus::Message::Seed(s) => {
612                    assert_eq!(s.view().get(), 1);
613                }
614                _ => panic!("Expected seed message"),
615            }
616        } else {
617            panic!("Expected to receive a message");
618        }
619    }
620
621    #[tokio::test]
622    async fn test_identity_verification() {
623        // Create two different fixtures
624        let (schemes1, _) = fixture(0);
625        let (_, identity2) = fixture(1);
626
627        // Start server with schemes1, but create client expecting identity2
628        let (addr, _handle) = start_server(schemes1[0].clone(), Sequential).await;
629        let client = Client::new(&format!("http://{addr}"), identity2, Sequential);
630        wait_for_ready(&client).await;
631
632        // Create a seed signed by schemes1
633        let context = Context {
634            round: Round::new(EPOCH, View::new(1)),
635            leader: ed25519::PrivateKey::from_seed(0).public_key(),
636            parent: (View::new(0), sha256::Digest::EMPTY),
637        };
638        let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
639        let proposal = Proposal::new(
640            Round::new(EPOCH, View::new(1)),
641            View::new(0),
642            block.digest(),
643        );
644        let seed = create_notarization(&schemes1, proposal).seed();
645
646        // Server accepts it (signed by schemes1, which server uses)
647        client.seed_upload(seed).await.unwrap();
648
649        // Client fails to verify (expects identity2 but seed is signed by schemes1)
650        let result = client.seed_get(IndexQuery::Latest).await;
651        assert!(result.is_err());
652    }
653
654    #[tokio::test]
655    async fn test_invalid_signature_rejection() {
656        let ctx = TestContext::new().await;
657
658        // Create different schemes (wrong ones)
659        let (wrong_schemes, _) = fixture(1);
660
661        // Create a seed with wrong schemes
662        let block = ctx.test_block();
663        let proposal = ctx.proposal(&block);
664        let bad_seed = create_notarization(&wrong_schemes, proposal).seed();
665
666        // Server rejects it (signature doesn't match server's identity)
667        let result = ctx.client.seed_upload(bad_seed).await;
668        assert!(result.is_err());
669    }
670
671    fn generate_self_signed_cert() -> CertifiedKey<KeyPair> {
672        let subject_alt_names = vec!["localhost".to_string(), "127.0.0.1".to_string()];
673        generate_simple_self_signed(subject_alt_names).unwrap()
674    }
675
676    async fn start_tls_server(
677        scheme: Scheme,
678        cert_key: &CertifiedKey<KeyPair>,
679        strategy: impl Strategy,
680    ) -> (SocketAddr, tokio::task::JoinHandle<()>) {
681        let indexer = Arc::new(Indexer::new(scheme, strategy));
682        let api = Api::new(indexer);
683        let app = api.router();
684
685        // Create rustls server config
686        let cert_der = CertificateDer::from(cert_key.cert.der().to_vec());
687        let key_der = PrivateKeyDer::try_from(cert_key.signing_key.serialize_der()).unwrap();
688
689        let server_config = rustls::ServerConfig::builder_with_provider(Arc::new(
690            rustls::crypto::aws_lc_rs::default_provider(),
691        ))
692        .with_safe_default_protocol_versions()
693        .unwrap()
694        .with_no_client_auth()
695        .with_single_cert(vec![cert_der], key_der)
696        .expect("Failed to create server config");
697        let tls_acceptor = TlsAcceptor::from(Arc::new(server_config));
698
699        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
700        let addr = listener.local_addr().unwrap();
701
702        let handle = tokio::spawn(async move {
703            loop {
704                let (stream, _) = listener.accept().await.unwrap();
705                let tls_acceptor = tls_acceptor.clone();
706                let app = app.clone();
707
708                tokio::spawn(async move {
709                    let tls_stream = match tls_acceptor.accept(stream).await {
710                        Ok(s) => s,
711                        Err(_) => return,
712                    };
713
714                    let io = hyper_util::rt::TokioIo::new(tls_stream);
715                    let service = hyper::service::service_fn(move |req| {
716                        let app = app.clone();
717                        async move { app.oneshot(req).await }
718                    });
719                    let _ = hyper_util::server::conn::auto::Builder::new(
720                        hyper_util::rt::TokioExecutor::new(),
721                    )
722                    .serve_connection_with_upgrades(io, service)
723                    .await;
724                });
725            }
726        });
727
728        (addr, handle)
729    }
730
731    fn create_tls_client(
732        addr: SocketAddr,
733        identity: Identity,
734        cert_key: &CertifiedKey<KeyPair>,
735    ) -> Client<Sequential> {
736        ClientBuilder::new(&format!("https://{addr}"), identity, Sequential)
737            .with_tls_cert(cert_key.cert.der().to_vec())
738            .build()
739    }
740
741    #[tokio::test]
742    async fn test_tls_https_connection() {
743        let cert_key = generate_self_signed_cert();
744
745        let mut rng = StdRng::seed_from_u64(0);
746        let Fixture { schemes, .. } =
747            bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
748        let identity = *schemes[0].polynomial().public();
749
750        let (addr, handle) = start_tls_server(schemes[0].clone(), &cert_key, Sequential).await;
751        let client = create_tls_client(addr, identity, &cert_key);
752        wait_for_ready(&client).await;
753
754        // Create and upload a seed
755        let context = Context {
756            round: Round::new(EPOCH, View::new(1)),
757            leader: ed25519::PrivateKey::from_seed(0).public_key(),
758            parent: (View::new(0), sha256::Digest::EMPTY),
759        };
760        let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
761        let proposal = Proposal::new(
762            Round::new(EPOCH, View::new(1)),
763            View::new(0),
764            block.digest(),
765        );
766        let seed = create_notarization(&schemes, proposal).seed();
767
768        // Test HTTPS POST
769        client.seed_upload(seed.clone()).await.unwrap();
770
771        // Test HTTPS GET
772        let retrieved = client.seed_get(IndexQuery::Latest).await.unwrap();
773        assert_eq!(retrieved.view(), seed.view());
774
775        handle.abort();
776    }
777
778    #[tokio::test]
779    async fn test_tls_websocket_connection() {
780        let cert_key = generate_self_signed_cert();
781
782        let mut rng = StdRng::seed_from_u64(0);
783        let Fixture { schemes, .. } =
784            bls12381_threshold::fixture::<MinSig, _>(&mut rng, NAMESPACE, 4);
785        let identity = *schemes[0].polynomial().public();
786
787        let (addr, handle) = start_tls_server(schemes[0].clone(), &cert_key, Sequential).await;
788        let client = create_tls_client(addr, identity, &cert_key);
789        wait_for_ready(&client).await;
790
791        // Create a seed
792        let context = Context {
793            round: Round::new(EPOCH, View::new(1)),
794            leader: ed25519::PrivateKey::from_seed(0).public_key(),
795            parent: (View::new(0), sha256::Digest::EMPTY),
796        };
797        let block = Block::new(context, Sha256::hash(b"genesis"), Height::new(1), 1000);
798        let proposal = Proposal::new(
799            Round::new(EPOCH, View::new(1)),
800            View::new(0),
801            block.digest(),
802        );
803        let seed = create_notarization(&schemes, proposal).seed();
804
805        // Connect to WebSocket over TLS
806        let mut stream = client.listen().await.unwrap();
807
808        // Signal that websocket is connected, then upload the seed
809        let (tx, rx) = tokio::sync::oneshot::channel();
810        let upload_client = client.clone();
811        tokio::spawn(async move {
812            rx.await.unwrap();
813            upload_client.seed_upload(seed).await.unwrap();
814        });
815
816        // Signal ready and wait for the seed message
817        tx.send(()).unwrap();
818        if let Some(Ok(msg)) = stream.next().await {
819            match msg {
820                alto_client::consensus::Message::Seed(s) => {
821                    assert_eq!(s.view().get(), 1);
822                }
823                _ => panic!("Expected seed message"),
824            }
825        } else {
826            panic!("Expected to receive a message");
827        }
828
829        handle.abort();
830    }
831}