Skip to main content

linera_faucet_server/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4#![recursion_limit = "512"]
5
6//! The server component of the Linera faucet.
7
8mod database;
9
10use std::{
11    collections::VecDeque, future::IntoFuture, net::SocketAddr, path::PathBuf, sync::Arc,
12    time::Instant,
13};
14
15use anyhow::Context as _;
16use async_graphql::{EmptySubscription, Error, Schema, SimpleObject};
17use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
18use axum::{Extension, Router};
19use futures::{lock::Mutex, FutureExt as _};
20#[cfg(with_metrics)]
21use linera_base::prometheus_util::MeasureLatency as _;
22use linera_base::{
23    bcs,
24    crypto::{CryptoHash, ValidatorPublicKey},
25    data_types::{Amount, ApplicationPermissions, ChainDescription, Epoch, TimeDelta, Timestamp},
26    identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId},
27    ownership::ChainOwnership,
28};
29use linera_chain::{types::ConfirmedBlockCertificate, ChainError, ChainExecutionContext};
30use linera_client::{
31    chain_listener::{ChainListener, ChainListenerConfig, ClientContext},
32    config::GenesisConfig,
33};
34use linera_core::{
35    client::{chain_client, ChainClient},
36    data_types::ClientOutcome,
37    worker::WorkerError,
38    LocalNodeError,
39};
40use linera_execution::{
41    system::{OpenChainConfig, SystemOperation},
42    Committee, ExecutionError, Operation,
43};
44#[cfg(feature = "metrics")]
45use linera_metrics::monitoring_server;
46use linera_storage::{Clock as _, Storage};
47use serde::Deserialize;
48use tokio::sync::{oneshot, Notify};
49use tokio_util::sync::CancellationToken;
50use tower_http::cors::CorsLayer;
51use tracing::info;
52
53use crate::database::FaucetDatabase;
54
55// Prometheus metrics for the faucet
56#[cfg(with_metrics)]
57mod metrics {
58    use std::sync::LazyLock;
59
60    use linera_base::prometheus_util::{
61        exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
62        register_int_gauge_vec,
63    };
64    use prometheus::{HistogramVec, IntCounterVec, IntGaugeVec};
65
66    pub static CLAIM_REQUESTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
67        register_int_counter_vec(
68            "faucet_claim_requests_total",
69            "Total number of claim requests by result",
70            &["result"],
71        )
72    });
73
74    pub static CLAIM_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75        register_histogram_vec(
76            "faucet_claim_latency_ms",
77            "End-to-end latency of claim requests in milliseconds",
78            &["result"],
79            exponential_bucket_interval(10.0, 40_000.0),
80        )
81    });
82
83    pub static CHAINS_CREATED_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
84        register_int_counter_vec(
85            "faucet_chains_created_total",
86            "Total number of chains created by the faucet",
87            &[],
88        )
89    });
90
91    pub static BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
92        register_histogram_vec(
93            "faucet_batch_size",
94            "Number of chain creation requests per batch",
95            &[],
96            Some(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0]),
97        )
98    });
99
100    pub static BATCH_PROCESSING_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
101        register_histogram_vec(
102            "faucet_batch_processing_latency_ms",
103            "Time to process a batch of chain creation requests in milliseconds",
104            &["result"],
105            exponential_bucket_interval(10.0, 40_000.0),
106        )
107    });
108
109    pub static QUEUE_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
110        register_histogram_vec(
111            "faucet_queue_size",
112            "Number of pending claim requests in the queue",
113            &[],
114            Some(vec![
115                0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0,
116            ]),
117        )
118    });
119
120    pub static QUEUE_WAIT_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
121        register_histogram_vec(
122            "faucet_queue_wait_time_ms",
123            "Time a request spends in the queue before processing in milliseconds",
124            &[],
125            exponential_bucket_interval(10.0, 40_000.0),
126        )
127    });
128
129    pub static FAUCET_BALANCE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
130        register_int_gauge_vec(
131            "faucet_balance_amount",
132            "Current balance of the faucet chain",
133            &[],
134        )
135    });
136
137    pub static RATE_LIMIT_REJECTIONS: LazyLock<IntCounterVec> = LazyLock::new(|| {
138        register_int_counter_vec(
139            "faucet_rate_limit_rejections_total",
140            "Number of requests rejected due to rate limiting",
141            &[],
142        )
143    });
144
145    pub static INSUFFICIENT_BALANCE_REJECTIONS: LazyLock<IntCounterVec> = LazyLock::new(|| {
146        register_int_counter_vec(
147            "faucet_insufficient_balance_rejections_total",
148            "Number of requests rejected due to insufficient faucet balance",
149            &[],
150        )
151    });
152
153    pub static DATABASE_OPERATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
154        register_histogram_vec(
155            "faucet_database_operation_latency_ms",
156            "Database operation latency in milliseconds",
157            &["operation"],
158            exponential_bucket_interval(0.5, 2000.0),
159        )
160    });
161
162    pub static RETRYABLE_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
163        register_int_counter_vec(
164            "faucet_retryable_errors_total",
165            "Number of chain execution retryable errors by type",
166            &["error_type"],
167        )
168    });
169
170    pub static STORE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
171        register_histogram_vec(
172            "faucet_store_chain_latency_ms",
173            "Latency of storing chain information in the database in milliseconds",
174            &[],
175            exponential_bucket_interval(10.0, 2000.0),
176        )
177    });
178}
179
180/// Returns an HTML response constructing the GraphiQL web page for the given URI.
181pub(crate) async fn graphiql(uri: axum::http::Uri) -> impl axum::response::IntoResponse {
182    axum::response::Html(
183        async_graphql::http::GraphiQLSource::build()
184            .endpoint(uri.path())
185            .subscription_endpoint("/ws")
186            .finish(),
187    )
188}
189
190#[cfg(test)]
191mod tests;
192
193/// The root GraphQL query type.
194pub struct QueryRoot<C: ClientContext> {
195    client: ChainClient<C::Environment>,
196    genesis_config: Arc<GenesisConfig>,
197    faucet_storage: Arc<FaucetDatabase>,
198}
199
200/// The root GraphQL mutation type.
201pub struct MutationRoot<S> {
202    faucet_storage: Arc<FaucetDatabase>,
203    pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
204    request_notifier: Arc<Notify>,
205    storage: S,
206    /// Amount for initial claims (chain creation).
207    initial_claim_amount: Amount,
208    /// Amount for daily claims (token transfer).
209    daily_claim_amount: Amount,
210}
211
212/// The result of a successful `claim` or `dailyClaim` mutation.
213#[derive(Clone, Debug, SimpleObject)]
214pub struct ClaimOutcome {
215    /// The ID of the chain.
216    pub chain_id: ChainId,
217    /// The hash of the certificate containing the operation.
218    pub certificate_hash: CryptoHash,
219    /// The amount of tokens transferred.
220    pub amount: Amount,
221}
222
223/// Information about the initial chain claim.
224#[derive(Clone, Debug, SimpleObject)]
225pub struct InitialClaim {
226    /// The chain ID that was created.
227    pub chain_id: ChainId,
228    /// The block timestamp when the chain was created.
229    pub timestamp: Timestamp,
230}
231
232#[derive(Debug, Deserialize, SimpleObject)]
233pub struct Validator {
234    pub public_key: ValidatorPublicKey,
235    pub network_address: String,
236}
237
238/// The response from a pending request.
239#[derive(Debug)]
240enum PendingResponse {
241    /// Initial claim: returns the full chain description.
242    Initial(Result<Box<ChainDescription>, Error>),
243    /// Daily claim: returns the claim outcome.
244    Daily(Result<ClaimOutcome, Error>),
245}
246
247/// A pending chain creation or token transfer request.
248///
249/// If `target_chain_id` is `None`, this is an initial claim (creates a new chain).
250/// If `target_chain_id` is `Some`, this is a daily claim (transfers tokens).
251#[derive(Debug)]
252struct PendingRequest {
253    owner: AccountOwner,
254    /// For daily claims, the existing chain to transfer tokens to.
255    target_chain_id: Option<ChainId>,
256    /// The amount of tokens to send.
257    amount: Amount,
258    /// For daily claims, the period number to store.
259    daily_period: u64,
260    responder: oneshot::Sender<PendingResponse>,
261    #[cfg(with_metrics)]
262    queued_at: std::time::Instant,
263}
264
265impl PendingRequest {
266    fn is_daily(&self) -> bool {
267        self.target_chain_id.is_some()
268    }
269
270    fn send_err(self, err: Error) {
271        let response = if self.is_daily() {
272            PendingResponse::Daily(Err(err))
273        } else {
274            PendingResponse::Initial(Err(err))
275        };
276        if self.responder.send(response).is_err() {
277            tracing::warn!("Receiver dropped while sending error to {}.", self.owner);
278        }
279    }
280}
281
282/// Configuration for the batch processor.
283#[derive(Clone)]
284struct BatchProcessorConfig {
285    end_timestamp: Timestamp,
286    start_timestamp: Timestamp,
287    start_balance: Amount,
288    max_batch_size: usize,
289}
290
291/// Batching coordinator for processing chain creation requests.
292struct BatchProcessor<C: ClientContext> {
293    config: BatchProcessorConfig,
294    context: Arc<Mutex<C>>,
295    client: ChainClient<C::Environment>,
296    faucet_storage: Arc<FaucetDatabase>,
297    pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
298    request_notifier: Arc<Notify>,
299}
300
301#[async_graphql::Object(cache_control(no_cache))]
302impl<C> QueryRoot<C>
303where
304    C: ClientContext + 'static,
305{
306    /// Returns the version information on this faucet service.
307    async fn version(&self) -> linera_version::VersionInfo {
308        linera_version::VersionInfo::default()
309    }
310
311    /// Returns the genesis config.
312    async fn genesis_config(&self) -> Result<serde_json::Value, Error> {
313        Ok(serde_json::to_value(&*self.genesis_config)?)
314    }
315
316    /// Returns the current committee's validators.
317    async fn current_validators(&self) -> Result<Vec<Validator>, Error> {
318        let committee = self.client.local_committee().await?;
319        Ok(committee
320            .validators()
321            .iter()
322            .map(|(public_key, validator)| Validator {
323                public_key: *public_key,
324                network_address: validator.network_address.clone(),
325            })
326            .collect())
327    }
328
329    /// Returns the current committee, including weights and resource policy.
330    async fn current_committee(&self) -> Result<Committee, Error> {
331        Ok((*self.client.local_committee().await?).clone())
332    }
333
334    /// Returns the current epoch of the faucet's chain.
335    async fn current_epoch(&self) -> Result<Epoch, Error> {
336        let info = self.client.chain_info().await?;
337        Ok(info.epoch)
338    }
339
340    /// Finds the existing chain with the given authentication key, if any.
341    async fn chain_id(&self, owner: AccountOwner) -> Result<ChainId, Error> {
342        // Check if this owner already has a chain.
343        #[cfg(with_metrics)]
344        let histogram = metrics::DATABASE_OPERATION_LATENCY.with_label_values(&["get_chain_id"]);
345        #[cfg(with_metrics)]
346        let _latency = histogram.measure_latency();
347
348        let chain_id = self.faucet_storage.get_chain_id(&owner).await?;
349
350        chain_id.ok_or_else(|| Error::new("This user has no chain yet"))
351    }
352
353    /// Returns the initial claim for the given owner, if any.
354    async fn initial_claim(&self, owner: AccountOwner) -> Result<Option<InitialClaim>, Error> {
355        let claim_record = self.faucet_storage.initial_claim(&owner).await?;
356
357        Ok(claim_record.map(|r| InitialClaim {
358            chain_id: r.chain_id,
359            timestamp: r.timestamp,
360        }))
361    }
362
363    /// Returns the earliest time at which the owner can make a daily claim.
364    /// If the returned timestamp is in the past (or now), the user can claim immediately.
365    /// Returns `None` if the user has not yet completed the initial claim.
366    async fn next_daily_claim(&self, owner: AccountOwner) -> Result<Option<Timestamp>, Error> {
367        let initial_claim = match self.faucet_storage.initial_claim(&owner).await? {
368            Some(record) => record,
369            None => return Ok(None),
370        };
371
372        let initial_micros = initial_claim.timestamp.micros();
373
374        // The initial claim counts as "a claim in period 0".
375        let last_period = self
376            .faucet_storage
377            .last_daily_claim_period(&owner)
378            .await?
379            .unwrap_or(0);
380
381        // Next available at start of the next period.
382        Ok(Some(Timestamp::from(
383            initial_micros + (last_period + 1) * DAILY_PERIOD_MICROS,
384        )))
385    }
386}
387
388/// Duration of one daily claim period in microseconds (24 hours).
389const DAILY_PERIOD_MICROS: u64 = TimeDelta::from_secs(24 * 60 * 60).as_micros();
390
391/// Computes the current daily claim period from timestamps.
392fn current_daily_period(initial_claim_micros: u64, now_micros: u64) -> u64 {
393    now_micros.saturating_sub(initial_claim_micros) / DAILY_PERIOD_MICROS
394}
395
396/// Executes a future and records its latency in [`metrics::CLAIM_LATENCY`], labeled by outcome.
397async fn record_claim_latency<T>(
398    future: impl std::future::Future<Output = Result<T, Error>>,
399) -> Result<T, Error> {
400    #[cfg(with_metrics)]
401    let start_time = std::time::Instant::now();
402
403    let result = future.await;
404
405    #[cfg(with_metrics)]
406    {
407        let label = if result.is_ok() { "success" } else { "error" };
408        metrics::CLAIM_LATENCY
409            .with_label_values(&[label])
410            .observe(start_time.elapsed().as_secs_f64() * 1000.0);
411    }
412
413    result
414}
415
416#[async_graphql::Object(cache_control(no_cache))]
417impl<S> MutationRoot<S>
418where
419    S: Storage + Send + Sync + 'static,
420{
421    /// Creates a new chain with the given authentication key, and transfers tokens to it.
422    async fn claim(&self, owner: AccountOwner) -> Result<ChainDescription, Error> {
423        record_claim_latency(self.do_claim(owner)).await
424    }
425
426    /// Transfers a daily amount of tokens to the user's existing chain.
427    /// The user must have already claimed a chain. Each user can claim once per 24-hour
428    /// period, measured from their initial claim time.
429    async fn daily_claim(&self, owner: AccountOwner) -> Result<ClaimOutcome, Error> {
430        record_claim_latency(self.do_daily_claim(owner)).await
431    }
432}
433
434impl<S> MutationRoot<S>
435where
436    S: Storage + Send + Sync + 'static,
437{
438    async fn do_claim(&self, owner: AccountOwner) -> Result<ChainDescription, Error> {
439        // Check if this owner already has a chain.
440        #[cfg(with_metrics)]
441        let histogram = metrics::DATABASE_OPERATION_LATENCY.with_label_values(&["get_chain_id"]);
442
443        tracing::debug!(account_owner=?owner, "claim request received");
444        #[cfg(with_metrics)]
445        let _latency = histogram.measure_latency();
446
447        let existing_chain_id = self.faucet_storage.get_chain_id(&owner).await?;
448
449        if let Some(existing_chain_id) = existing_chain_id {
450            #[cfg(with_metrics)]
451            metrics::CLAIM_REQUESTS_TOTAL
452                .with_label_values(&["duplicate"])
453                .inc();
454
455            let stored_chain =
456                get_chain_description_from_storage(&self.storage, existing_chain_id).await;
457            tracing::debug!(account_owner=?owner, "claim request processed; returning pre-existing chain");
458            return stored_chain;
459        }
460
461        // Create a oneshot channel to receive the result.
462        let (tx, rx) = oneshot::channel();
463
464        // Add request to the queue.
465        {
466            let mut requests = self.pending_requests.lock().await;
467            requests.push_back(PendingRequest {
468                owner,
469                target_chain_id: None,
470                amount: self.initial_claim_amount,
471                daily_period: 0,
472                responder: tx,
473                #[cfg(with_metrics)]
474                queued_at: std::time::Instant::now(),
475            });
476
477            #[cfg(with_metrics)]
478            metrics::QUEUE_SIZE
479                .with_label_values(&[])
480                .observe(requests.len() as f64);
481        }
482
483        // Notify the batch processor that there's a new request.
484        self.request_notifier.notify_one();
485
486        // Wait for the result
487        let response = rx
488            .await
489            .map_err(|_| Error::new("Request processing was cancelled"))?;
490
491        #[cfg(with_metrics)]
492        {
493            let label = match &response {
494                PendingResponse::Initial(Ok(_)) => "success",
495                _ => "error",
496            };
497            metrics::CLAIM_REQUESTS_TOTAL
498                .with_label_values(&[label])
499                .inc();
500        }
501
502        tracing::debug!(account_owner=?owner, "claim request processed; new chain created");
503        match response {
504            PendingResponse::Initial(result) => result.map(|b| *b),
505            PendingResponse::Daily(_) => Err(Error::new("Unexpected response type")),
506        }
507    }
508
509    async fn do_daily_claim(&self, owner: AccountOwner) -> Result<ClaimOutcome, Error> {
510        if self.daily_claim_amount == Amount::ZERO {
511            return Err(Error::new("Daily claims are not enabled on this faucet"));
512        }
513
514        // The user must have done the initial claim first.
515        let initial_claim = self
516            .faucet_storage
517            .initial_claim(&owner)
518            .await?
519            .ok_or_else(|| Error::new("You must claim a chain before making daily claims"))?;
520
521        let now = self.storage.clock().current_time();
522        let period = current_daily_period(initial_claim.timestamp.micros(), now.micros());
523        let last_period = self
524            .faucet_storage
525            .last_daily_claim_period(&owner)
526            .await?
527            .unwrap_or(0);
528
529        if period <= last_period {
530            return Err(Error::new(
531                "You have already claimed tokens for this period",
532            ));
533        }
534
535        self.enqueue_daily_request(
536            owner,
537            initial_claim.chain_id,
538            self.daily_claim_amount,
539            period,
540        )
541        .await
542    }
543
544    async fn enqueue_daily_request(
545        &self,
546        owner: AccountOwner,
547        target_chain_id: ChainId,
548        amount: Amount,
549        daily_period: u64,
550    ) -> Result<ClaimOutcome, Error> {
551        // Create a oneshot channel to receive the result.
552        let (tx, rx) = oneshot::channel();
553
554        // Add request to the queue.
555        {
556            let mut requests = self.pending_requests.lock().await;
557            requests.push_back(PendingRequest {
558                owner,
559                target_chain_id: Some(target_chain_id),
560                amount,
561                daily_period,
562                responder: tx,
563                #[cfg(with_metrics)]
564                queued_at: std::time::Instant::now(),
565            });
566
567            #[cfg(with_metrics)]
568            metrics::QUEUE_SIZE
569                .with_label_values(&[])
570                .observe(requests.len() as f64);
571        }
572
573        // Notify the batch processor that there's a new request.
574        self.request_notifier.notify_one();
575
576        // Wait for the result
577        let response = rx
578            .await
579            .map_err(|_| Error::new("Request processing was cancelled"))?;
580
581        #[cfg(with_metrics)]
582        {
583            let label = match &response {
584                PendingResponse::Daily(Ok(_)) => "success",
585                _ => "error",
586            };
587            metrics::CLAIM_REQUESTS_TOTAL
588                .with_label_values(&[label])
589                .inc();
590        }
591
592        match response {
593            PendingResponse::Daily(result) => result,
594            PendingResponse::Initial(_) => Err(Error::new("Unexpected response type")),
595        }
596    }
597}
598/// Multiplies a `u128` with a `u64` and returns the result as a 192-bit number.
599fn multiply(a: u128, b: u64) -> [u64; 3] {
600    let lower = u128::from(u64::MAX);
601    let b = u128::from(b);
602    let mut a1 = (a >> 64) * b;
603    let a0 = (a & lower) * b;
604    a1 += a0 >> 64;
605    [(a1 >> 64) as u64, (a1 & lower) as u64, (a0 & lower) as u64]
606}
607
608/// Retrieves a chain description from storage by reading the blob directly.
609///
610/// This function handles errors appropriately and returns a GraphQL-compatible result.
611async fn get_chain_description_from_storage<S>(
612    storage: &S,
613    chain_id: ChainId,
614) -> Result<ChainDescription, Error>
615where
616    S: Storage,
617{
618    // Create blob ID from chain ID - the chain ID is the hash of the chain description blob
619    let blob_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
620
621    // Read the blob directly from storage
622    let blob = storage
623        .read_blob(blob_id)
624        .await
625        .map_err(|e| {
626            tracing::error!(?chain_id, ?e, "failed to read chain description blob");
627            Error::new(format!(
628                "Storage error while reading chain description: {e}"
629            ))
630        })?
631        .ok_or_else(|| {
632            tracing::error!(?chain_id, "chain description blob not found for chain");
633            Error::new(format!("Chain description not found for chain {chain_id}"))
634        })?;
635
636    // Deserialize the chain description from the blob bytes
637    let description = bcs::from_bytes::<ChainDescription>(blob.bytes()).map_err(|e| {
638        tracing::error!(?e, ?chain_id, "failed to deserialize chain description",);
639        Error::new(format!(
640            "Invalid chain description data for chain {chain_id}"
641        ))
642    })?;
643
644    Ok(description)
645}
646
647impl<C> BatchProcessor<C>
648where
649    C: ClientContext + 'static,
650{
651    /// Creates a new batch processor.
652    fn new(
653        config: BatchProcessorConfig,
654        context: Arc<Mutex<C>>,
655        client: ChainClient<C::Environment>,
656        faucet_storage: Arc<FaucetDatabase>,
657        pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
658        request_notifier: Arc<Notify>,
659    ) -> Self {
660        Self {
661            config,
662            context,
663            client,
664            faucet_storage,
665            pending_requests,
666            request_notifier,
667        }
668    }
669
670    /// Runs the batch processor loop.
671    async fn run(&mut self, cancellation_token: CancellationToken) {
672        loop {
673            tokio::select! {
674                _ = self.request_notifier.notified() => {
675                    if let Err(e) = self.process_batch().await {
676                        tracing::error!(?e, "batch processing error");
677                    }
678                }
679                _ = cancellation_token.cancelled() => {
680                    // Process any remaining requests before shutting down
681                    if let Err(e) = self.process_batch().await {
682                        tracing::error!(?e, "final batch processing error");
683                    }
684                    break;
685                }
686            }
687        }
688    }
689
690    /// Processes batches until there are no more pending requests in the queue.
691    async fn process_batch(&mut self) -> anyhow::Result<()> {
692        loop {
693            let batch_requests = self.get_request_batch().await;
694
695            if batch_requests.is_empty() {
696                return Ok(());
697            }
698
699            let batch_size = batch_requests.len();
700            tracing::info!(?batch_size, "processing requests");
701
702            #[cfg(with_metrics)]
703            {
704                metrics::BATCH_SIZE
705                    .with_label_values(&[])
706                    .observe(batch_size as f64);
707
708                metrics::QUEUE_SIZE.with_label_values(&[]).observe(0.0);
709            }
710
711            #[cfg(with_metrics)]
712            let batch_start_time = std::time::Instant::now();
713
714            let batch_result = self.execute_batch(batch_requests).await;
715
716            #[cfg(with_metrics)]
717            {
718                let elapsed_ms = batch_start_time.elapsed().as_secs_f64() * 1000.0;
719                let label = if batch_result.is_ok() {
720                    "success"
721                } else {
722                    "error"
723                };
724                metrics::BATCH_PROCESSING_LATENCY
725                    .with_label_values(&[label])
726                    .observe(elapsed_ms);
727            }
728
729            if let Err(err) = batch_result {
730                tracing::error!(?err, "batch execution error");
731                return Err(err);
732            }
733        }
734    }
735
736    // Collects requests from the queue; validates and filters them.
737    async fn get_request_batch(&self) -> Vec<PendingRequest> {
738        let mut batch_requests = Vec::new();
739        let mut requests = self.pending_requests.lock().await;
740        while batch_requests.len() < self.config.max_batch_size {
741            let Some(request) = requests.pop_front() else {
742                break;
743            };
744
745            match self.validate_request(&request).await {
746                Ok(()) => {
747                    batch_requests.push(request);
748                }
749                Err(err) => {
750                    request.send_err(err);
751                }
752            }
753        }
754        batch_requests
755    }
756
757    /// Validates a pending request based on whether it's an initial or daily claim.
758    async fn validate_request(&self, request: &PendingRequest) -> Result<(), Error> {
759        if request.is_daily() {
760            // Verify the initial claim still exists.
761            let initial_claim = match self.faucet_storage.initial_claim(&request.owner).await {
762                Ok(Some(record)) => record,
763                Ok(None) => {
764                    return Err(Error::new(
765                        "You must claim a chain before making daily claims",
766                    ));
767                }
768                Err(err) => {
769                    tracing::error!("Database error: {err}");
770                    return Err(Error::new(err.to_string()));
771                }
772            };
773
774            // Verify the daily claim period.
775            let now = self.client.storage_client().clock().current_time();
776            let period = current_daily_period(initial_claim.timestamp.micros(), now.micros());
777            let last_period = self
778                .faucet_storage
779                .last_daily_claim_period(&request.owner)
780                .await?
781                .unwrap_or(0);
782
783            if period <= last_period {
784                return Err(Error::new(
785                    "You have already claimed tokens for this period",
786                ));
787            }
788        } else {
789            match self.faucet_storage.get_chain_id(&request.owner).await {
790                Ok(None) => {}
791                Ok(Some(_)) => {
792                    #[cfg(with_metrics)]
793                    metrics::CLAIM_REQUESTS_TOTAL
794                        .with_label_values(&["duplicate"])
795                        .inc();
796                    return Err(Error::new("This user already has a chain"));
797                }
798                Err(err) => {
799                    tracing::error!("Database error: {err}");
800                    return Err(Error::new(err.to_string()));
801                }
802            }
803        }
804        Ok(())
805    }
806
807    /// Checks if the given requests can currently be fulfilled, based on the balance
808    /// and rate limiting settings. Returns an error if not.
809    async fn check_rate_limiting(&self, requests: &[PendingRequest]) -> async_graphql::Result<()> {
810        let end_timestamp = self.config.end_timestamp;
811        let start_timestamp = self.config.start_timestamp;
812        let local_time = self.client.storage_client().clock().current_time();
813        let full_duration = end_timestamp.delta_since(start_timestamp).as_micros();
814        let remaining_duration = end_timestamp.delta_since(local_time).as_micros();
815        let balance = self.client.local_balance().await?;
816
817        #[cfg(with_metrics)]
818        metrics::FAUCET_BALANCE
819            .with_label_values(&[])
820            .set(u128::from(balance) as i64);
821
822        let total_amount = requests
823            .iter()
824            .fold(Amount::ZERO, |acc, r| acc.saturating_add(r.amount));
825        let Ok(remaining_balance) = balance.try_sub(total_amount) else {
826            // Not enough balance - reject all requests
827            #[cfg(with_metrics)]
828            metrics::INSUFFICIENT_BALANCE_REJECTIONS
829                .with_label_values(&[])
830                .inc();
831            return Err(Error::new("The faucet is empty."));
832        };
833
834        // Rate limit: Locked token balance decreases lineraly with time, i.e.:
835        // remaining_balance / remaining_duration >= start_balance / full_duration
836        if multiply(u128::from(self.config.start_balance), remaining_duration)
837            > multiply(u128::from(remaining_balance), full_duration)
838        {
839            #[cfg(with_metrics)]
840            metrics::RATE_LIMIT_REJECTIONS.with_label_values(&[]).inc();
841            return Err(Error::new("Not enough unlocked balance; try again later."));
842        }
843        Ok(())
844    }
845
846    /// Sends an error response to all requestors.
847    fn send_err(requests: Vec<PendingRequest>, err: impl Into<async_graphql::Error>) {
848        let err = err.into();
849        for request in requests {
850            request.send_err(err.clone());
851        }
852    }
853
854    /// Executes a batch of chain creation and/or token transfer requests.
855    async fn execute_batch(&mut self, requests: Vec<PendingRequest>) -> anyhow::Result<()> {
856        if let Err(err) = self.check_rate_limiting(&requests).await {
857            tracing::debug!("Rejecting requests due to rate limiting: {err:?}");
858            Self::send_err(requests, err);
859            return Ok(());
860        }
861
862        // Build operations: OpenChain for initial claims, Transfer for daily claims.
863        let mut operations = Vec::new();
864        for request in &requests {
865            let operation = if let Some(target_chain_id) = request.target_chain_id {
866                Operation::system(SystemOperation::Transfer {
867                    owner: AccountOwner::CHAIN,
868                    recipient: Account {
869                        chain_id: target_chain_id,
870                        owner: request.owner,
871                    },
872                    amount: request.amount,
873                })
874            } else {
875                let config = OpenChainConfig {
876                    ownership: ChainOwnership::single(request.owner),
877                    balance: request.amount,
878                    application_permissions: ApplicationPermissions::default(),
879                };
880                Operation::system(SystemOperation::OpenChain(config))
881            };
882            operations.push(operation);
883        }
884
885        // Execute all operations in a single block
886        let execute_ops_start = Instant::now();
887        let result = self.client.execute_operations(operations, vec![]).await;
888        tracing::debug!(
889            execute_operations_ms = execute_ops_start.elapsed().as_millis(),
890            "execute_operations completed"
891        );
892        let waiting_for_lock_start = Instant::now();
893        self.context
894            .lock()
895            .await
896            .update_wallet(&self.client)
897            .await?;
898        tracing::debug!(
899            wait_time_ms = waiting_for_lock_start.elapsed().as_millis(),
900            "wallet updated after executing operations"
901        );
902        let certificate = match result {
903            Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
904                WorkerError::ChainError(chain_err),
905            ))) => {
906                tracing::debug!(error=?chain_err, "local worker errored when executing operations");
907                match *chain_err {
908                    ChainError::ExecutionError(exec_err, ChainExecutionContext::Operation(i))
909                        if i > 0
910                            && matches!(
911                                *exec_err,
912                                ExecutionError::BlockTooLarge
913                                    | ExecutionError::FeesExceedFunding { .. }
914                                    | ExecutionError::InsufficientBalance { .. }
915                                    | ExecutionError::MaximumFuelExceeded(_)
916                            ) =>
917                    {
918                        tracing::error!(index=?i, %exec_err, "execution of operation failed; reducing batch size");
919
920                        #[cfg(with_metrics)]
921                        {
922                            let error_type = match *exec_err {
923                                ExecutionError::BlockTooLarge => "block_too_large",
924                                ExecutionError::FeesExceedFunding { .. } => "fees_exceed_funding",
925                                ExecutionError::InsufficientBalance { .. } => {
926                                    "insufficient_balance"
927                                }
928                                ExecutionError::MaximumFuelExceeded(_) => "fuel_exceeded",
929                                _ => "other",
930                            };
931                            metrics::RETRYABLE_ERRORS
932                                .with_label_values(&[error_type])
933                                .inc();
934                        }
935
936                        self.config.max_batch_size = i as usize;
937                        // Put the valid requests back into the queue.
938                        let mut pending_requests = self.pending_requests.lock().await;
939                        for request in requests.into_iter().rev() {
940                            pending_requests.push_front(request);
941                        }
942                        return Ok(()); // Don't return an error, so we retry.
943                    }
944                    chain_err => {
945                        Self::send_err(requests, chain_err.to_string());
946                        return Err(chain_client::Error::LocalNodeError(
947                            LocalNodeError::WorkerError(WorkerError::ChainError(chain_err.into())),
948                        )
949                        .into());
950                    }
951                }
952            }
953            Err(err) => {
954                tracing::debug!("Error executing operations: {err}");
955                Self::send_err(requests, err.to_string());
956                return Err(err.into());
957            }
958            Ok(ClientOutcome::Committed(certificate)) => certificate,
959            Ok(ClientOutcome::WaitForTimeout(timeout)) => {
960                let error_msg = format!(
961                    "This faucet is using a multi-owner chain and is not the leader right now. \
962                    Try again at {}",
963                    timeout.timestamp,
964                );
965                Self::send_err(requests, error_msg.clone());
966                return Ok(());
967            }
968            Ok(ClientOutcome::Conflict(certificate)) => {
969                let error_msg = format!(
970                    "A different block was committed at this height: {}",
971                    certificate.hash(),
972                );
973                Self::send_err(requests, error_msg.clone());
974                return Ok(());
975            }
976        };
977
978        let certificate_hash = certificate.hash();
979        let block_timestamp = certificate.block().header.timestamp;
980
981        // Parse chain descriptions from the block's blobs (for initial claims only).
982        let chain_descriptions = extract_opened_single_owner_chains(&certificate)?;
983
984        // Build a map of owner -> description for initial claims.
985        let initial_desc_map: std::collections::HashMap<_, _> =
986            chain_descriptions.into_iter().collect();
987
988        // Split requests by claim type for storage.
989        let initial_chains: Vec<_> = initial_desc_map
990            .iter()
991            .map(|(owner, description)| (*owner, description.id()))
992            .collect();
993        let daily_claims: Vec<_> = requests
994            .iter()
995            .filter_map(|r| {
996                r.target_chain_id
997                    .map(|chain_id| (r.owner, chain_id, r.daily_period))
998            })
999            .collect();
1000
1001        #[cfg(with_metrics)]
1002        let store_chains_start = std::time::Instant::now();
1003
1004        let store_initial = async {
1005            if initial_chains.is_empty() {
1006                return Ok(());
1007            }
1008            self.faucet_storage
1009                .store_chains_batch(initial_chains, block_timestamp)
1010                .await
1011        };
1012        let store_daily = async {
1013            if daily_claims.is_empty() {
1014                return Ok(());
1015            }
1016            self.faucet_storage
1017                .store_daily_claims_batch(daily_claims)
1018                .await
1019        };
1020
1021        if let Err(e) = futures::try_join!(store_initial, store_daily) {
1022            let error_msg = format!("Failed to save claims to database: {e}");
1023            Self::send_err(requests, error_msg.clone());
1024            anyhow::bail!(error_msg);
1025        }
1026        #[cfg(with_metrics)]
1027        {
1028            let elapsed_ms = store_chains_start.elapsed().as_secs_f64() * 1000.0;
1029            metrics::STORE_CHAIN_LATENCY
1030                .with_label_values(&[])
1031                .observe(elapsed_ms);
1032        }
1033
1034        // Respond to requests.
1035        #[cfg(with_metrics)]
1036        let chains_created = initial_desc_map.len();
1037
1038        for request in requests {
1039            #[cfg(with_metrics)]
1040            {
1041                let wait_time = request.queued_at.elapsed().as_secs_f64() * 1000.0;
1042                metrics::QUEUE_WAIT_TIME
1043                    .with_label_values(&[])
1044                    .observe(wait_time);
1045            }
1046
1047            let response = if let Some(target_chain_id) = request.target_chain_id {
1048                PendingResponse::Daily(Ok(ClaimOutcome {
1049                    chain_id: target_chain_id,
1050                    certificate_hash,
1051                    amount: request.amount,
1052                }))
1053            } else if let Some(description) = initial_desc_map.get(&request.owner) {
1054                PendingResponse::Initial(Ok(Box::new(description.clone())))
1055            } else {
1056                PendingResponse::Initial(Err(Error::new(format!(
1057                    "No chain created for owner {}",
1058                    request.owner
1059                ))))
1060            };
1061            if request.responder.send(response).is_err() {
1062                tracing::warn!(
1063                    "Receiver dropped while sending response to {}.",
1064                    request.owner
1065                );
1066            }
1067        }
1068
1069        #[cfg(with_metrics)]
1070        metrics::CHAINS_CREATED_TOTAL
1071            .with_label_values(&[])
1072            .inc_by(chains_created as u64);
1073
1074        Ok(())
1075    }
1076}
1077
1078/// A GraphQL interface to request a new chain with tokens.
1079pub struct FaucetService<C>
1080where
1081    C: ClientContext,
1082{
1083    chain_id: ChainId,
1084    context: Arc<Mutex<C>>,
1085    client: ChainClient<C::Environment>,
1086    genesis_config: Arc<GenesisConfig>,
1087    config: ChainListenerConfig,
1088    storage: <C::Environment as linera_core::Environment>::Storage,
1089    port: u16,
1090    #[cfg(feature = "metrics")]
1091    metrics_port: u16,
1092    initial_claim_amount: Amount,
1093    daily_claim_amount: Amount,
1094    end_timestamp: Timestamp,
1095    start_timestamp: Timestamp,
1096    start_balance: Amount,
1097    faucet_storage: Arc<FaucetDatabase>,
1098    storage_path: PathBuf,
1099    /// Batching components
1100    pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
1101    request_notifier: Arc<Notify>,
1102    max_batch_size: usize,
1103    enable_memory_profiling: bool,
1104}
1105
1106impl<C> Clone for FaucetService<C>
1107where
1108    C: ClientContext + 'static,
1109{
1110    fn clone(&self) -> Self {
1111        Self {
1112            chain_id: self.chain_id,
1113            context: Arc::clone(&self.context),
1114            client: self.client.clone(),
1115            genesis_config: Arc::clone(&self.genesis_config),
1116            config: self.config.clone(),
1117            storage: self.storage.clone(),
1118            port: self.port,
1119            #[cfg(feature = "metrics")]
1120            metrics_port: self.metrics_port,
1121            initial_claim_amount: self.initial_claim_amount,
1122            daily_claim_amount: self.daily_claim_amount,
1123            end_timestamp: self.end_timestamp,
1124            start_timestamp: self.start_timestamp,
1125            start_balance: self.start_balance,
1126            faucet_storage: Arc::clone(&self.faucet_storage),
1127            storage_path: self.storage_path.clone(),
1128            pending_requests: Arc::clone(&self.pending_requests),
1129            request_notifier: Arc::clone(&self.request_notifier),
1130            max_batch_size: self.max_batch_size,
1131            enable_memory_profiling: self.enable_memory_profiling,
1132        }
1133    }
1134}
1135
1136pub struct FaucetConfig {
1137    pub port: u16,
1138    #[cfg(feature = "metrics")]
1139    pub metrics_port: u16,
1140    pub chain_id: ChainId,
1141    pub initial_claim_amount: Amount,
1142    pub daily_claim_amount: Amount,
1143    pub end_timestamp: Timestamp,
1144    pub genesis_config: std::sync::Arc<GenesisConfig>,
1145    pub chain_listener_config: ChainListenerConfig,
1146    pub storage_path: PathBuf,
1147    pub max_batch_size: usize,
1148    pub enable_memory_profiling: bool,
1149}
1150
1151impl<C> FaucetService<C>
1152where
1153    C: ClientContext + 'static,
1154{
1155    /// Creates a new instance of the faucet service.
1156    pub async fn new(config: FaucetConfig, context: C) -> anyhow::Result<Self> {
1157        let storage = context.storage().clone();
1158        let client = context.make_chain_client(config.chain_id).await?;
1159        let context = Arc::new(Mutex::new(context));
1160        let start_timestamp = client.storage_client().clock().current_time();
1161        client.process_inbox().await?;
1162        let start_balance = client.local_balance().await?;
1163
1164        // Use provided storage path
1165        let storage_path = config.storage_path.clone();
1166
1167        // Initialize database.
1168        let faucet_storage = FaucetDatabase::new(&storage_path)
1169            .await
1170            .context("Failed to initialize faucet database")?;
1171
1172        // Synchronize database with blockchain history.
1173        if let Err(e) = faucet_storage.sync_with_blockchain(&client).await {
1174            tracing::warn!("Failed to synchronize database with blockchain: {}", e);
1175        }
1176
1177        let faucet_storage = Arc::new(faucet_storage);
1178
1179        // Initialize batching components
1180        let pending_requests = Arc::new(Mutex::new(VecDeque::new()));
1181        let request_notifier = Arc::new(Notify::new());
1182
1183        Ok(Self {
1184            chain_id: config.chain_id,
1185            storage,
1186            context,
1187            client,
1188            genesis_config: config.genesis_config,
1189            config: config.chain_listener_config,
1190            port: config.port,
1191            #[cfg(feature = "metrics")]
1192            metrics_port: config.metrics_port,
1193            initial_claim_amount: config.initial_claim_amount,
1194            daily_claim_amount: config.daily_claim_amount,
1195            end_timestamp: config.end_timestamp,
1196            start_timestamp,
1197            start_balance,
1198            faucet_storage,
1199            storage_path,
1200            pending_requests,
1201            request_notifier,
1202            max_batch_size: config.max_batch_size,
1203            enable_memory_profiling: config.enable_memory_profiling,
1204        })
1205    }
1206
1207    fn schema(
1208        &self,
1209    ) -> Schema<
1210        QueryRoot<C>,
1211        MutationRoot<<C::Environment as linera_core::Environment>::Storage>,
1212        EmptySubscription,
1213    > {
1214        let mutation_root = MutationRoot {
1215            faucet_storage: Arc::clone(&self.faucet_storage),
1216            pending_requests: Arc::clone(&self.pending_requests),
1217            request_notifier: Arc::clone(&self.request_notifier),
1218            storage: self.storage.clone(),
1219            initial_claim_amount: self.initial_claim_amount,
1220            daily_claim_amount: self.daily_claim_amount,
1221        };
1222        let query_root = QueryRoot {
1223            genesis_config: Arc::clone(&self.genesis_config),
1224            client: self.client.clone(),
1225            faucet_storage: Arc::clone(&self.faucet_storage),
1226        };
1227        Schema::build(query_root, mutation_root, EmptySubscription).finish()
1228    }
1229
1230    #[cfg(feature = "metrics")]
1231    fn metrics_address(&self) -> SocketAddr {
1232        SocketAddr::from(([0, 0, 0, 0], self.metrics_port))
1233    }
1234
1235    /// Runs the faucet.
1236    #[tracing::instrument(name = "FaucetService::run", skip_all, fields(port = self.port, chain_id = ?self.chain_id))]
1237    pub async fn run(self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
1238        let port = self.port;
1239        let index_handler = axum::routing::get(graphiql).post(Self::index_handler);
1240
1241        #[cfg(feature = "metrics")]
1242        monitoring_server::start_metrics_with_profiling(
1243            self.metrics_address(),
1244            cancellation_token.clone(),
1245            self.enable_memory_profiling,
1246        )
1247        .await;
1248
1249        let app = Router::new()
1250            .route("/", index_handler)
1251            .route("/ready", axum::routing::get(|| async { "ready!" }))
1252            .route_service("/ws", GraphQLSubscription::new(self.schema()))
1253            .layer(Extension(self.clone()))
1254            .layer(CorsLayer::permissive());
1255
1256        info!("GraphiQL IDE: http://localhost:{}", port);
1257
1258        // Start the batch processor
1259        let batch_processor_config = BatchProcessorConfig {
1260            end_timestamp: self.end_timestamp,
1261            start_timestamp: self.start_timestamp,
1262            start_balance: self.start_balance,
1263            max_batch_size: self.max_batch_size,
1264        };
1265        let mut batch_processor = BatchProcessor::new(
1266            batch_processor_config,
1267            Arc::clone(&self.context),
1268            self.client.clone(),
1269            Arc::clone(&self.faucet_storage),
1270            Arc::clone(&self.pending_requests),
1271            Arc::clone(&self.request_notifier),
1272        );
1273
1274        let chain_listener = ChainListener::new(
1275            self.config,
1276            self.context,
1277            self.storage,
1278            cancellation_token.clone(),
1279            tokio::sync::mpsc::unbounded_channel().1,
1280            false, // Faucet doesn't receive messages, so no need for background sync
1281        )
1282        .run()
1283        .await?;
1284        let batch_processor_task = batch_processor.run(cancellation_token.clone());
1285        let tcp_listener =
1286            tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1287        let server = axum::serve(tcp_listener, app)
1288            .with_graceful_shutdown(cancellation_token.cancelled_owned())
1289            .into_future();
1290        futures::select! {
1291            result = Box::pin(chain_listener).fuse() => result?,
1292            _ = Box::pin(batch_processor_task).fuse() => {},
1293            result = Box::pin(server).fuse() => result?,
1294        };
1295
1296        Ok(())
1297    }
1298
1299    /// Executes a GraphQL query and generates a response for our `Schema`.
1300    async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1301        let schema = service.0.schema();
1302        schema.execute(request.into_inner()).await.into()
1303    }
1304}
1305
1306/// Returns all `(AccountOwner, ChainDescription)` pairs for single-owner chains created in this
1307/// block.
1308fn extract_opened_single_owner_chains(
1309    certificate: &ConfirmedBlockCertificate,
1310) -> anyhow::Result<Vec<(AccountOwner, ChainDescription)>> {
1311    let created_blobs = certificate.block().body.blobs.iter().flatten();
1312    created_blobs
1313        .filter_map(|blob| {
1314            if blob.content().blob_type() != BlobType::ChainDescription {
1315                return None;
1316            }
1317            let description = match bcs::from_bytes::<ChainDescription>(blob.bytes()) {
1318                Err(err) => return Some(Err(anyhow::anyhow!(err))),
1319                Ok(description) => description,
1320            };
1321            let chain_id = description.id();
1322            let owner = {
1323                let mut owners = description.config().ownership.all_owners();
1324                match (owners.next(), owners.next()) {
1325                    (Some(owner), None) => *owner,
1326                    (None, None) | (_, Some(_)) => {
1327                        tracing::info!("Skipping chain {chain_id}; not exactly one owner.");
1328                        return None;
1329                    }
1330                }
1331            };
1332            tracing::debug!("Found chain {chain_id} created for owner {owner}",);
1333            Some(Ok((owner, description)))
1334        })
1335        .collect::<Result<Vec<_>, _>>()
1336}