1#![recursion_limit = "512"]
5
6mod database;
9
10use std::{
11 collections::VecDeque, future::IntoFuture, net::SocketAddr, path::PathBuf, sync::Arc,
12 time::Instant,
13};
14
15use anyhow::Context as _;
16use async_graphql::{EmptySubscription, Error, Schema, SimpleObject};
17use async_graphql_axum::{GraphQLRequest, GraphQLResponse, GraphQLSubscription};
18use axum::{Extension, Router};
19use futures::{lock::Mutex, FutureExt as _};
20#[cfg(with_metrics)]
21use linera_base::prometheus_util::MeasureLatency as _;
22use linera_base::{
23 bcs,
24 crypto::{CryptoHash, ValidatorPublicKey},
25 data_types::{Amount, ApplicationPermissions, ChainDescription, Epoch, TimeDelta, Timestamp},
26 identifiers::{Account, AccountOwner, BlobId, BlobType, ChainId},
27 ownership::ChainOwnership,
28};
29use linera_chain::{types::ConfirmedBlockCertificate, ChainError, ChainExecutionContext};
30use linera_client::{
31 chain_listener::{ChainListener, ChainListenerConfig, ClientContext},
32 config::GenesisConfig,
33};
34use linera_core::{
35 client::{chain_client, ChainClient},
36 data_types::ClientOutcome,
37 worker::WorkerError,
38 LocalNodeError,
39};
40use linera_execution::{
41 system::{OpenChainConfig, SystemOperation},
42 Committee, ExecutionError, Operation,
43};
44#[cfg(feature = "metrics")]
45use linera_metrics::monitoring_server;
46use linera_storage::{Clock as _, Storage};
47use serde::Deserialize;
48use tokio::sync::{oneshot, Notify};
49use tokio_util::sync::CancellationToken;
50use tower_http::cors::CorsLayer;
51use tracing::info;
52
53use crate::database::FaucetDatabase;
54
55#[cfg(with_metrics)]
57mod metrics {
58 use std::sync::LazyLock;
59
60 use linera_base::prometheus_util::{
61 exponential_bucket_interval, register_histogram_vec, register_int_counter_vec,
62 register_int_gauge_vec,
63 };
64 use prometheus::{HistogramVec, IntCounterVec, IntGaugeVec};
65
66 pub static CLAIM_REQUESTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
67 register_int_counter_vec(
68 "faucet_claim_requests_total",
69 "Total number of claim requests by result",
70 &["result"],
71 )
72 });
73
74 pub static CLAIM_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
75 register_histogram_vec(
76 "faucet_claim_latency_ms",
77 "End-to-end latency of claim requests in milliseconds",
78 &["result"],
79 exponential_bucket_interval(10.0, 40_000.0),
80 )
81 });
82
83 pub static CHAINS_CREATED_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
84 register_int_counter_vec(
85 "faucet_chains_created_total",
86 "Total number of chains created by the faucet",
87 &[],
88 )
89 });
90
91 pub static BATCH_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
92 register_histogram_vec(
93 "faucet_batch_size",
94 "Number of chain creation requests per batch",
95 &[],
96 Some(vec![1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0]),
97 )
98 });
99
100 pub static BATCH_PROCESSING_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
101 register_histogram_vec(
102 "faucet_batch_processing_latency_ms",
103 "Time to process a batch of chain creation requests in milliseconds",
104 &["result"],
105 exponential_bucket_interval(10.0, 40_000.0),
106 )
107 });
108
109 pub static QUEUE_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
110 register_histogram_vec(
111 "faucet_queue_size",
112 "Number of pending claim requests in the queue",
113 &[],
114 Some(vec![
115 0.0, 1.0, 2.0, 5.0, 10.0, 20.0, 50.0, 100.0, 200.0, 500.0,
116 ]),
117 )
118 });
119
120 pub static QUEUE_WAIT_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
121 register_histogram_vec(
122 "faucet_queue_wait_time_ms",
123 "Time a request spends in the queue before processing in milliseconds",
124 &[],
125 exponential_bucket_interval(10.0, 40_000.0),
126 )
127 });
128
129 pub static FAUCET_BALANCE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
130 register_int_gauge_vec(
131 "faucet_balance_amount",
132 "Current balance of the faucet chain",
133 &[],
134 )
135 });
136
137 pub static RATE_LIMIT_REJECTIONS: LazyLock<IntCounterVec> = LazyLock::new(|| {
138 register_int_counter_vec(
139 "faucet_rate_limit_rejections_total",
140 "Number of requests rejected due to rate limiting",
141 &[],
142 )
143 });
144
145 pub static INSUFFICIENT_BALANCE_REJECTIONS: LazyLock<IntCounterVec> = LazyLock::new(|| {
146 register_int_counter_vec(
147 "faucet_insufficient_balance_rejections_total",
148 "Number of requests rejected due to insufficient faucet balance",
149 &[],
150 )
151 });
152
153 pub static DATABASE_OPERATION_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
154 register_histogram_vec(
155 "faucet_database_operation_latency_ms",
156 "Database operation latency in milliseconds",
157 &["operation"],
158 exponential_bucket_interval(0.5, 2000.0),
159 )
160 });
161
162 pub static RETRYABLE_ERRORS: LazyLock<IntCounterVec> = LazyLock::new(|| {
163 register_int_counter_vec(
164 "faucet_retryable_errors_total",
165 "Number of chain execution retryable errors by type",
166 &["error_type"],
167 )
168 });
169
170 pub static STORE_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
171 register_histogram_vec(
172 "faucet_store_chain_latency_ms",
173 "Latency of storing chain information in the database in milliseconds",
174 &[],
175 exponential_bucket_interval(10.0, 2000.0),
176 )
177 });
178}
179
180pub(crate) async fn graphiql(uri: axum::http::Uri) -> impl axum::response::IntoResponse {
182 axum::response::Html(
183 async_graphql::http::GraphiQLSource::build()
184 .endpoint(uri.path())
185 .subscription_endpoint("/ws")
186 .finish(),
187 )
188}
189
190#[cfg(test)]
191mod tests;
192
193pub struct QueryRoot<C: ClientContext> {
195 client: ChainClient<C::Environment>,
196 genesis_config: Arc<GenesisConfig>,
197 faucet_storage: Arc<FaucetDatabase>,
198}
199
200pub struct MutationRoot<S> {
202 faucet_storage: Arc<FaucetDatabase>,
203 pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
204 request_notifier: Arc<Notify>,
205 storage: S,
206 initial_claim_amount: Amount,
208 daily_claim_amount: Amount,
210}
211
212#[derive(Clone, Debug, SimpleObject)]
214pub struct ClaimOutcome {
215 pub chain_id: ChainId,
217 pub certificate_hash: CryptoHash,
219 pub amount: Amount,
221}
222
223#[derive(Clone, Debug, SimpleObject)]
225pub struct InitialClaim {
226 pub chain_id: ChainId,
228 pub timestamp: Timestamp,
230}
231
232#[derive(Debug, Deserialize, SimpleObject)]
233pub struct Validator {
234 pub public_key: ValidatorPublicKey,
235 pub network_address: String,
236}
237
238#[derive(Debug)]
240enum PendingResponse {
241 Initial(Result<Box<ChainDescription>, Error>),
243 Daily(Result<ClaimOutcome, Error>),
245}
246
247#[derive(Debug)]
252struct PendingRequest {
253 owner: AccountOwner,
254 target_chain_id: Option<ChainId>,
256 amount: Amount,
258 daily_period: u64,
260 responder: oneshot::Sender<PendingResponse>,
261 #[cfg(with_metrics)]
262 queued_at: std::time::Instant,
263}
264
265impl PendingRequest {
266 fn is_daily(&self) -> bool {
267 self.target_chain_id.is_some()
268 }
269
270 fn send_err(self, err: Error) {
271 let response = if self.is_daily() {
272 PendingResponse::Daily(Err(err))
273 } else {
274 PendingResponse::Initial(Err(err))
275 };
276 if self.responder.send(response).is_err() {
277 tracing::warn!("Receiver dropped while sending error to {}.", self.owner);
278 }
279 }
280}
281
282#[derive(Clone)]
284struct BatchProcessorConfig {
285 end_timestamp: Timestamp,
286 start_timestamp: Timestamp,
287 start_balance: Amount,
288 max_batch_size: usize,
289}
290
291struct BatchProcessor<C: ClientContext> {
293 config: BatchProcessorConfig,
294 context: Arc<Mutex<C>>,
295 client: ChainClient<C::Environment>,
296 faucet_storage: Arc<FaucetDatabase>,
297 pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
298 request_notifier: Arc<Notify>,
299}
300
301#[async_graphql::Object(cache_control(no_cache))]
302impl<C> QueryRoot<C>
303where
304 C: ClientContext + 'static,
305{
306 async fn version(&self) -> linera_version::VersionInfo {
308 linera_version::VersionInfo::default()
309 }
310
311 async fn genesis_config(&self) -> Result<serde_json::Value, Error> {
313 Ok(serde_json::to_value(&*self.genesis_config)?)
314 }
315
316 async fn current_validators(&self) -> Result<Vec<Validator>, Error> {
318 let committee = self.client.local_committee().await?;
319 Ok(committee
320 .validators()
321 .iter()
322 .map(|(public_key, validator)| Validator {
323 public_key: *public_key,
324 network_address: validator.network_address.clone(),
325 })
326 .collect())
327 }
328
329 async fn current_committee(&self) -> Result<Committee, Error> {
331 Ok((*self.client.local_committee().await?).clone())
332 }
333
334 async fn current_epoch(&self) -> Result<Epoch, Error> {
336 let info = self.client.chain_info().await?;
337 Ok(info.epoch)
338 }
339
340 async fn chain_id(&self, owner: AccountOwner) -> Result<ChainId, Error> {
342 #[cfg(with_metrics)]
344 let histogram = metrics::DATABASE_OPERATION_LATENCY.with_label_values(&["get_chain_id"]);
345 #[cfg(with_metrics)]
346 let _latency = histogram.measure_latency();
347
348 let chain_id = self.faucet_storage.get_chain_id(&owner).await?;
349
350 chain_id.ok_or_else(|| Error::new("This user has no chain yet"))
351 }
352
353 async fn initial_claim(&self, owner: AccountOwner) -> Result<Option<InitialClaim>, Error> {
355 let claim_record = self.faucet_storage.initial_claim(&owner).await?;
356
357 Ok(claim_record.map(|r| InitialClaim {
358 chain_id: r.chain_id,
359 timestamp: r.timestamp,
360 }))
361 }
362
363 async fn next_daily_claim(&self, owner: AccountOwner) -> Result<Option<Timestamp>, Error> {
367 let initial_claim = match self.faucet_storage.initial_claim(&owner).await? {
368 Some(record) => record,
369 None => return Ok(None),
370 };
371
372 let initial_micros = initial_claim.timestamp.micros();
373
374 let last_period = self
376 .faucet_storage
377 .last_daily_claim_period(&owner)
378 .await?
379 .unwrap_or(0);
380
381 Ok(Some(Timestamp::from(
383 initial_micros + (last_period + 1) * DAILY_PERIOD_MICROS,
384 )))
385 }
386}
387
388const DAILY_PERIOD_MICROS: u64 = TimeDelta::from_secs(24 * 60 * 60).as_micros();
390
391fn current_daily_period(initial_claim_micros: u64, now_micros: u64) -> u64 {
393 now_micros.saturating_sub(initial_claim_micros) / DAILY_PERIOD_MICROS
394}
395
396async fn record_claim_latency<T>(
398 future: impl std::future::Future<Output = Result<T, Error>>,
399) -> Result<T, Error> {
400 #[cfg(with_metrics)]
401 let start_time = std::time::Instant::now();
402
403 let result = future.await;
404
405 #[cfg(with_metrics)]
406 {
407 let label = if result.is_ok() { "success" } else { "error" };
408 metrics::CLAIM_LATENCY
409 .with_label_values(&[label])
410 .observe(start_time.elapsed().as_secs_f64() * 1000.0);
411 }
412
413 result
414}
415
416#[async_graphql::Object(cache_control(no_cache))]
417impl<S> MutationRoot<S>
418where
419 S: Storage + Send + Sync + 'static,
420{
421 async fn claim(&self, owner: AccountOwner) -> Result<ChainDescription, Error> {
423 record_claim_latency(self.do_claim(owner)).await
424 }
425
426 async fn daily_claim(&self, owner: AccountOwner) -> Result<ClaimOutcome, Error> {
430 record_claim_latency(self.do_daily_claim(owner)).await
431 }
432}
433
434impl<S> MutationRoot<S>
435where
436 S: Storage + Send + Sync + 'static,
437{
438 async fn do_claim(&self, owner: AccountOwner) -> Result<ChainDescription, Error> {
439 #[cfg(with_metrics)]
441 let histogram = metrics::DATABASE_OPERATION_LATENCY.with_label_values(&["get_chain_id"]);
442
443 tracing::debug!(account_owner=?owner, "claim request received");
444 #[cfg(with_metrics)]
445 let _latency = histogram.measure_latency();
446
447 let existing_chain_id = self.faucet_storage.get_chain_id(&owner).await?;
448
449 if let Some(existing_chain_id) = existing_chain_id {
450 #[cfg(with_metrics)]
451 metrics::CLAIM_REQUESTS_TOTAL
452 .with_label_values(&["duplicate"])
453 .inc();
454
455 let stored_chain =
456 get_chain_description_from_storage(&self.storage, existing_chain_id).await;
457 tracing::debug!(account_owner=?owner, "claim request processed; returning pre-existing chain");
458 return stored_chain;
459 }
460
461 let (tx, rx) = oneshot::channel();
463
464 {
466 let mut requests = self.pending_requests.lock().await;
467 requests.push_back(PendingRequest {
468 owner,
469 target_chain_id: None,
470 amount: self.initial_claim_amount,
471 daily_period: 0,
472 responder: tx,
473 #[cfg(with_metrics)]
474 queued_at: std::time::Instant::now(),
475 });
476
477 #[cfg(with_metrics)]
478 metrics::QUEUE_SIZE
479 .with_label_values(&[])
480 .observe(requests.len() as f64);
481 }
482
483 self.request_notifier.notify_one();
485
486 let response = rx
488 .await
489 .map_err(|_| Error::new("Request processing was cancelled"))?;
490
491 #[cfg(with_metrics)]
492 {
493 let label = match &response {
494 PendingResponse::Initial(Ok(_)) => "success",
495 _ => "error",
496 };
497 metrics::CLAIM_REQUESTS_TOTAL
498 .with_label_values(&[label])
499 .inc();
500 }
501
502 tracing::debug!(account_owner=?owner, "claim request processed; new chain created");
503 match response {
504 PendingResponse::Initial(result) => result.map(|b| *b),
505 PendingResponse::Daily(_) => Err(Error::new("Unexpected response type")),
506 }
507 }
508
509 async fn do_daily_claim(&self, owner: AccountOwner) -> Result<ClaimOutcome, Error> {
510 if self.daily_claim_amount == Amount::ZERO {
511 return Err(Error::new("Daily claims are not enabled on this faucet"));
512 }
513
514 let initial_claim = self
516 .faucet_storage
517 .initial_claim(&owner)
518 .await?
519 .ok_or_else(|| Error::new("You must claim a chain before making daily claims"))?;
520
521 let now = self.storage.clock().current_time();
522 let period = current_daily_period(initial_claim.timestamp.micros(), now.micros());
523 let last_period = self
524 .faucet_storage
525 .last_daily_claim_period(&owner)
526 .await?
527 .unwrap_or(0);
528
529 if period <= last_period {
530 return Err(Error::new(
531 "You have already claimed tokens for this period",
532 ));
533 }
534
535 self.enqueue_daily_request(
536 owner,
537 initial_claim.chain_id,
538 self.daily_claim_amount,
539 period,
540 )
541 .await
542 }
543
544 async fn enqueue_daily_request(
545 &self,
546 owner: AccountOwner,
547 target_chain_id: ChainId,
548 amount: Amount,
549 daily_period: u64,
550 ) -> Result<ClaimOutcome, Error> {
551 let (tx, rx) = oneshot::channel();
553
554 {
556 let mut requests = self.pending_requests.lock().await;
557 requests.push_back(PendingRequest {
558 owner,
559 target_chain_id: Some(target_chain_id),
560 amount,
561 daily_period,
562 responder: tx,
563 #[cfg(with_metrics)]
564 queued_at: std::time::Instant::now(),
565 });
566
567 #[cfg(with_metrics)]
568 metrics::QUEUE_SIZE
569 .with_label_values(&[])
570 .observe(requests.len() as f64);
571 }
572
573 self.request_notifier.notify_one();
575
576 let response = rx
578 .await
579 .map_err(|_| Error::new("Request processing was cancelled"))?;
580
581 #[cfg(with_metrics)]
582 {
583 let label = match &response {
584 PendingResponse::Daily(Ok(_)) => "success",
585 _ => "error",
586 };
587 metrics::CLAIM_REQUESTS_TOTAL
588 .with_label_values(&[label])
589 .inc();
590 }
591
592 match response {
593 PendingResponse::Daily(result) => result,
594 PendingResponse::Initial(_) => Err(Error::new("Unexpected response type")),
595 }
596 }
597}
598fn multiply(a: u128, b: u64) -> [u64; 3] {
600 let lower = u128::from(u64::MAX);
601 let b = u128::from(b);
602 let mut a1 = (a >> 64) * b;
603 let a0 = (a & lower) * b;
604 a1 += a0 >> 64;
605 [(a1 >> 64) as u64, (a1 & lower) as u64, (a0 & lower) as u64]
606}
607
608async fn get_chain_description_from_storage<S>(
612 storage: &S,
613 chain_id: ChainId,
614) -> Result<ChainDescription, Error>
615where
616 S: Storage,
617{
618 let blob_id = BlobId::new(chain_id.0, BlobType::ChainDescription);
620
621 let blob = storage
623 .read_blob(blob_id)
624 .await
625 .map_err(|e| {
626 tracing::error!(?chain_id, ?e, "failed to read chain description blob");
627 Error::new(format!(
628 "Storage error while reading chain description: {e}"
629 ))
630 })?
631 .ok_or_else(|| {
632 tracing::error!(?chain_id, "chain description blob not found for chain");
633 Error::new(format!("Chain description not found for chain {chain_id}"))
634 })?;
635
636 let description = bcs::from_bytes::<ChainDescription>(blob.bytes()).map_err(|e| {
638 tracing::error!(?e, ?chain_id, "failed to deserialize chain description",);
639 Error::new(format!(
640 "Invalid chain description data for chain {chain_id}"
641 ))
642 })?;
643
644 Ok(description)
645}
646
647impl<C> BatchProcessor<C>
648where
649 C: ClientContext + 'static,
650{
651 fn new(
653 config: BatchProcessorConfig,
654 context: Arc<Mutex<C>>,
655 client: ChainClient<C::Environment>,
656 faucet_storage: Arc<FaucetDatabase>,
657 pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
658 request_notifier: Arc<Notify>,
659 ) -> Self {
660 Self {
661 config,
662 context,
663 client,
664 faucet_storage,
665 pending_requests,
666 request_notifier,
667 }
668 }
669
670 async fn run(&mut self, cancellation_token: CancellationToken) {
672 loop {
673 tokio::select! {
674 _ = self.request_notifier.notified() => {
675 if let Err(e) = self.process_batch().await {
676 tracing::error!(?e, "batch processing error");
677 }
678 }
679 _ = cancellation_token.cancelled() => {
680 if let Err(e) = self.process_batch().await {
682 tracing::error!(?e, "final batch processing error");
683 }
684 break;
685 }
686 }
687 }
688 }
689
690 async fn process_batch(&mut self) -> anyhow::Result<()> {
692 loop {
693 let batch_requests = self.get_request_batch().await;
694
695 if batch_requests.is_empty() {
696 return Ok(());
697 }
698
699 let batch_size = batch_requests.len();
700 tracing::info!(?batch_size, "processing requests");
701
702 #[cfg(with_metrics)]
703 {
704 metrics::BATCH_SIZE
705 .with_label_values(&[])
706 .observe(batch_size as f64);
707
708 metrics::QUEUE_SIZE.with_label_values(&[]).observe(0.0);
709 }
710
711 #[cfg(with_metrics)]
712 let batch_start_time = std::time::Instant::now();
713
714 let batch_result = self.execute_batch(batch_requests).await;
715
716 #[cfg(with_metrics)]
717 {
718 let elapsed_ms = batch_start_time.elapsed().as_secs_f64() * 1000.0;
719 let label = if batch_result.is_ok() {
720 "success"
721 } else {
722 "error"
723 };
724 metrics::BATCH_PROCESSING_LATENCY
725 .with_label_values(&[label])
726 .observe(elapsed_ms);
727 }
728
729 if let Err(err) = batch_result {
730 tracing::error!(?err, "batch execution error");
731 return Err(err);
732 }
733 }
734 }
735
736 async fn get_request_batch(&self) -> Vec<PendingRequest> {
738 let mut batch_requests = Vec::new();
739 let mut requests = self.pending_requests.lock().await;
740 while batch_requests.len() < self.config.max_batch_size {
741 let Some(request) = requests.pop_front() else {
742 break;
743 };
744
745 match self.validate_request(&request).await {
746 Ok(()) => {
747 batch_requests.push(request);
748 }
749 Err(err) => {
750 request.send_err(err);
751 }
752 }
753 }
754 batch_requests
755 }
756
757 async fn validate_request(&self, request: &PendingRequest) -> Result<(), Error> {
759 if request.is_daily() {
760 let initial_claim = match self.faucet_storage.initial_claim(&request.owner).await {
762 Ok(Some(record)) => record,
763 Ok(None) => {
764 return Err(Error::new(
765 "You must claim a chain before making daily claims",
766 ));
767 }
768 Err(err) => {
769 tracing::error!("Database error: {err}");
770 return Err(Error::new(err.to_string()));
771 }
772 };
773
774 let now = self.client.storage_client().clock().current_time();
776 let period = current_daily_period(initial_claim.timestamp.micros(), now.micros());
777 let last_period = self
778 .faucet_storage
779 .last_daily_claim_period(&request.owner)
780 .await?
781 .unwrap_or(0);
782
783 if period <= last_period {
784 return Err(Error::new(
785 "You have already claimed tokens for this period",
786 ));
787 }
788 } else {
789 match self.faucet_storage.get_chain_id(&request.owner).await {
790 Ok(None) => {}
791 Ok(Some(_)) => {
792 #[cfg(with_metrics)]
793 metrics::CLAIM_REQUESTS_TOTAL
794 .with_label_values(&["duplicate"])
795 .inc();
796 return Err(Error::new("This user already has a chain"));
797 }
798 Err(err) => {
799 tracing::error!("Database error: {err}");
800 return Err(Error::new(err.to_string()));
801 }
802 }
803 }
804 Ok(())
805 }
806
807 async fn check_rate_limiting(&self, requests: &[PendingRequest]) -> async_graphql::Result<()> {
810 let end_timestamp = self.config.end_timestamp;
811 let start_timestamp = self.config.start_timestamp;
812 let local_time = self.client.storage_client().clock().current_time();
813 let full_duration = end_timestamp.delta_since(start_timestamp).as_micros();
814 let remaining_duration = end_timestamp.delta_since(local_time).as_micros();
815 let balance = self.client.local_balance().await?;
816
817 #[cfg(with_metrics)]
818 metrics::FAUCET_BALANCE
819 .with_label_values(&[])
820 .set(u128::from(balance) as i64);
821
822 let total_amount = requests
823 .iter()
824 .fold(Amount::ZERO, |acc, r| acc.saturating_add(r.amount));
825 let Ok(remaining_balance) = balance.try_sub(total_amount) else {
826 #[cfg(with_metrics)]
828 metrics::INSUFFICIENT_BALANCE_REJECTIONS
829 .with_label_values(&[])
830 .inc();
831 return Err(Error::new("The faucet is empty."));
832 };
833
834 if multiply(u128::from(self.config.start_balance), remaining_duration)
837 > multiply(u128::from(remaining_balance), full_duration)
838 {
839 #[cfg(with_metrics)]
840 metrics::RATE_LIMIT_REJECTIONS.with_label_values(&[]).inc();
841 return Err(Error::new("Not enough unlocked balance; try again later."));
842 }
843 Ok(())
844 }
845
846 fn send_err(requests: Vec<PendingRequest>, err: impl Into<async_graphql::Error>) {
848 let err = err.into();
849 for request in requests {
850 request.send_err(err.clone());
851 }
852 }
853
854 async fn execute_batch(&mut self, requests: Vec<PendingRequest>) -> anyhow::Result<()> {
856 if let Err(err) = self.check_rate_limiting(&requests).await {
857 tracing::debug!("Rejecting requests due to rate limiting: {err:?}");
858 Self::send_err(requests, err);
859 return Ok(());
860 }
861
862 let mut operations = Vec::new();
864 for request in &requests {
865 let operation = if let Some(target_chain_id) = request.target_chain_id {
866 Operation::system(SystemOperation::Transfer {
867 owner: AccountOwner::CHAIN,
868 recipient: Account {
869 chain_id: target_chain_id,
870 owner: request.owner,
871 },
872 amount: request.amount,
873 })
874 } else {
875 let config = OpenChainConfig {
876 ownership: ChainOwnership::single(request.owner),
877 balance: request.amount,
878 application_permissions: ApplicationPermissions::default(),
879 };
880 Operation::system(SystemOperation::OpenChain(config))
881 };
882 operations.push(operation);
883 }
884
885 let execute_ops_start = Instant::now();
887 let result = self.client.execute_operations(operations, vec![]).await;
888 tracing::debug!(
889 execute_operations_ms = execute_ops_start.elapsed().as_millis(),
890 "execute_operations completed"
891 );
892 let waiting_for_lock_start = Instant::now();
893 self.context
894 .lock()
895 .await
896 .update_wallet(&self.client)
897 .await?;
898 tracing::debug!(
899 wait_time_ms = waiting_for_lock_start.elapsed().as_millis(),
900 "wallet updated after executing operations"
901 );
902 let certificate = match result {
903 Err(chain_client::Error::LocalNodeError(LocalNodeError::WorkerError(
904 WorkerError::ChainError(chain_err),
905 ))) => {
906 tracing::debug!(error=?chain_err, "local worker errored when executing operations");
907 match *chain_err {
908 ChainError::ExecutionError(exec_err, ChainExecutionContext::Operation(i))
909 if i > 0
910 && matches!(
911 *exec_err,
912 ExecutionError::BlockTooLarge
913 | ExecutionError::FeesExceedFunding { .. }
914 | ExecutionError::InsufficientBalance { .. }
915 | ExecutionError::MaximumFuelExceeded(_)
916 ) =>
917 {
918 tracing::error!(index=?i, %exec_err, "execution of operation failed; reducing batch size");
919
920 #[cfg(with_metrics)]
921 {
922 let error_type = match *exec_err {
923 ExecutionError::BlockTooLarge => "block_too_large",
924 ExecutionError::FeesExceedFunding { .. } => "fees_exceed_funding",
925 ExecutionError::InsufficientBalance { .. } => {
926 "insufficient_balance"
927 }
928 ExecutionError::MaximumFuelExceeded(_) => "fuel_exceeded",
929 _ => "other",
930 };
931 metrics::RETRYABLE_ERRORS
932 .with_label_values(&[error_type])
933 .inc();
934 }
935
936 self.config.max_batch_size = i as usize;
937 let mut pending_requests = self.pending_requests.lock().await;
939 for request in requests.into_iter().rev() {
940 pending_requests.push_front(request);
941 }
942 return Ok(()); }
944 chain_err => {
945 Self::send_err(requests, chain_err.to_string());
946 return Err(chain_client::Error::LocalNodeError(
947 LocalNodeError::WorkerError(WorkerError::ChainError(chain_err.into())),
948 )
949 .into());
950 }
951 }
952 }
953 Err(err) => {
954 tracing::debug!("Error executing operations: {err}");
955 Self::send_err(requests, err.to_string());
956 return Err(err.into());
957 }
958 Ok(ClientOutcome::Committed(certificate)) => certificate,
959 Ok(ClientOutcome::WaitForTimeout(timeout)) => {
960 let error_msg = format!(
961 "This faucet is using a multi-owner chain and is not the leader right now. \
962 Try again at {}",
963 timeout.timestamp,
964 );
965 Self::send_err(requests, error_msg.clone());
966 return Ok(());
967 }
968 Ok(ClientOutcome::Conflict(certificate)) => {
969 let error_msg = format!(
970 "A different block was committed at this height: {}",
971 certificate.hash(),
972 );
973 Self::send_err(requests, error_msg.clone());
974 return Ok(());
975 }
976 };
977
978 let certificate_hash = certificate.hash();
979 let block_timestamp = certificate.block().header.timestamp;
980
981 let chain_descriptions = extract_opened_single_owner_chains(&certificate)?;
983
984 let initial_desc_map: std::collections::HashMap<_, _> =
986 chain_descriptions.into_iter().collect();
987
988 let initial_chains: Vec<_> = initial_desc_map
990 .iter()
991 .map(|(owner, description)| (*owner, description.id()))
992 .collect();
993 let daily_claims: Vec<_> = requests
994 .iter()
995 .filter_map(|r| {
996 r.target_chain_id
997 .map(|chain_id| (r.owner, chain_id, r.daily_period))
998 })
999 .collect();
1000
1001 #[cfg(with_metrics)]
1002 let store_chains_start = std::time::Instant::now();
1003
1004 let store_initial = async {
1005 if initial_chains.is_empty() {
1006 return Ok(());
1007 }
1008 self.faucet_storage
1009 .store_chains_batch(initial_chains, block_timestamp)
1010 .await
1011 };
1012 let store_daily = async {
1013 if daily_claims.is_empty() {
1014 return Ok(());
1015 }
1016 self.faucet_storage
1017 .store_daily_claims_batch(daily_claims)
1018 .await
1019 };
1020
1021 if let Err(e) = futures::try_join!(store_initial, store_daily) {
1022 let error_msg = format!("Failed to save claims to database: {e}");
1023 Self::send_err(requests, error_msg.clone());
1024 anyhow::bail!(error_msg);
1025 }
1026 #[cfg(with_metrics)]
1027 {
1028 let elapsed_ms = store_chains_start.elapsed().as_secs_f64() * 1000.0;
1029 metrics::STORE_CHAIN_LATENCY
1030 .with_label_values(&[])
1031 .observe(elapsed_ms);
1032 }
1033
1034 #[cfg(with_metrics)]
1036 let chains_created = initial_desc_map.len();
1037
1038 for request in requests {
1039 #[cfg(with_metrics)]
1040 {
1041 let wait_time = request.queued_at.elapsed().as_secs_f64() * 1000.0;
1042 metrics::QUEUE_WAIT_TIME
1043 .with_label_values(&[])
1044 .observe(wait_time);
1045 }
1046
1047 let response = if let Some(target_chain_id) = request.target_chain_id {
1048 PendingResponse::Daily(Ok(ClaimOutcome {
1049 chain_id: target_chain_id,
1050 certificate_hash,
1051 amount: request.amount,
1052 }))
1053 } else if let Some(description) = initial_desc_map.get(&request.owner) {
1054 PendingResponse::Initial(Ok(Box::new(description.clone())))
1055 } else {
1056 PendingResponse::Initial(Err(Error::new(format!(
1057 "No chain created for owner {}",
1058 request.owner
1059 ))))
1060 };
1061 if request.responder.send(response).is_err() {
1062 tracing::warn!(
1063 "Receiver dropped while sending response to {}.",
1064 request.owner
1065 );
1066 }
1067 }
1068
1069 #[cfg(with_metrics)]
1070 metrics::CHAINS_CREATED_TOTAL
1071 .with_label_values(&[])
1072 .inc_by(chains_created as u64);
1073
1074 Ok(())
1075 }
1076}
1077
1078pub struct FaucetService<C>
1080where
1081 C: ClientContext,
1082{
1083 chain_id: ChainId,
1084 context: Arc<Mutex<C>>,
1085 client: ChainClient<C::Environment>,
1086 genesis_config: Arc<GenesisConfig>,
1087 config: ChainListenerConfig,
1088 storage: <C::Environment as linera_core::Environment>::Storage,
1089 port: u16,
1090 #[cfg(feature = "metrics")]
1091 metrics_port: u16,
1092 initial_claim_amount: Amount,
1093 daily_claim_amount: Amount,
1094 end_timestamp: Timestamp,
1095 start_timestamp: Timestamp,
1096 start_balance: Amount,
1097 faucet_storage: Arc<FaucetDatabase>,
1098 storage_path: PathBuf,
1099 pending_requests: Arc<Mutex<VecDeque<PendingRequest>>>,
1101 request_notifier: Arc<Notify>,
1102 max_batch_size: usize,
1103 enable_memory_profiling: bool,
1104}
1105
1106impl<C> Clone for FaucetService<C>
1107where
1108 C: ClientContext + 'static,
1109{
1110 fn clone(&self) -> Self {
1111 Self {
1112 chain_id: self.chain_id,
1113 context: Arc::clone(&self.context),
1114 client: self.client.clone(),
1115 genesis_config: Arc::clone(&self.genesis_config),
1116 config: self.config.clone(),
1117 storage: self.storage.clone(),
1118 port: self.port,
1119 #[cfg(feature = "metrics")]
1120 metrics_port: self.metrics_port,
1121 initial_claim_amount: self.initial_claim_amount,
1122 daily_claim_amount: self.daily_claim_amount,
1123 end_timestamp: self.end_timestamp,
1124 start_timestamp: self.start_timestamp,
1125 start_balance: self.start_balance,
1126 faucet_storage: Arc::clone(&self.faucet_storage),
1127 storage_path: self.storage_path.clone(),
1128 pending_requests: Arc::clone(&self.pending_requests),
1129 request_notifier: Arc::clone(&self.request_notifier),
1130 max_batch_size: self.max_batch_size,
1131 enable_memory_profiling: self.enable_memory_profiling,
1132 }
1133 }
1134}
1135
1136pub struct FaucetConfig {
1137 pub port: u16,
1138 #[cfg(feature = "metrics")]
1139 pub metrics_port: u16,
1140 pub chain_id: ChainId,
1141 pub initial_claim_amount: Amount,
1142 pub daily_claim_amount: Amount,
1143 pub end_timestamp: Timestamp,
1144 pub genesis_config: std::sync::Arc<GenesisConfig>,
1145 pub chain_listener_config: ChainListenerConfig,
1146 pub storage_path: PathBuf,
1147 pub max_batch_size: usize,
1148 pub enable_memory_profiling: bool,
1149}
1150
1151impl<C> FaucetService<C>
1152where
1153 C: ClientContext + 'static,
1154{
1155 pub async fn new(config: FaucetConfig, context: C) -> anyhow::Result<Self> {
1157 let storage = context.storage().clone();
1158 let client = context.make_chain_client(config.chain_id).await?;
1159 let context = Arc::new(Mutex::new(context));
1160 let start_timestamp = client.storage_client().clock().current_time();
1161 client.process_inbox().await?;
1162 let start_balance = client.local_balance().await?;
1163
1164 let storage_path = config.storage_path.clone();
1166
1167 let faucet_storage = FaucetDatabase::new(&storage_path)
1169 .await
1170 .context("Failed to initialize faucet database")?;
1171
1172 if let Err(e) = faucet_storage.sync_with_blockchain(&client).await {
1174 tracing::warn!("Failed to synchronize database with blockchain: {}", e);
1175 }
1176
1177 let faucet_storage = Arc::new(faucet_storage);
1178
1179 let pending_requests = Arc::new(Mutex::new(VecDeque::new()));
1181 let request_notifier = Arc::new(Notify::new());
1182
1183 Ok(Self {
1184 chain_id: config.chain_id,
1185 storage,
1186 context,
1187 client,
1188 genesis_config: config.genesis_config,
1189 config: config.chain_listener_config,
1190 port: config.port,
1191 #[cfg(feature = "metrics")]
1192 metrics_port: config.metrics_port,
1193 initial_claim_amount: config.initial_claim_amount,
1194 daily_claim_amount: config.daily_claim_amount,
1195 end_timestamp: config.end_timestamp,
1196 start_timestamp,
1197 start_balance,
1198 faucet_storage,
1199 storage_path,
1200 pending_requests,
1201 request_notifier,
1202 max_batch_size: config.max_batch_size,
1203 enable_memory_profiling: config.enable_memory_profiling,
1204 })
1205 }
1206
1207 fn schema(
1208 &self,
1209 ) -> Schema<
1210 QueryRoot<C>,
1211 MutationRoot<<C::Environment as linera_core::Environment>::Storage>,
1212 EmptySubscription,
1213 > {
1214 let mutation_root = MutationRoot {
1215 faucet_storage: Arc::clone(&self.faucet_storage),
1216 pending_requests: Arc::clone(&self.pending_requests),
1217 request_notifier: Arc::clone(&self.request_notifier),
1218 storage: self.storage.clone(),
1219 initial_claim_amount: self.initial_claim_amount,
1220 daily_claim_amount: self.daily_claim_amount,
1221 };
1222 let query_root = QueryRoot {
1223 genesis_config: Arc::clone(&self.genesis_config),
1224 client: self.client.clone(),
1225 faucet_storage: Arc::clone(&self.faucet_storage),
1226 };
1227 Schema::build(query_root, mutation_root, EmptySubscription).finish()
1228 }
1229
1230 #[cfg(feature = "metrics")]
1231 fn metrics_address(&self) -> SocketAddr {
1232 SocketAddr::from(([0, 0, 0, 0], self.metrics_port))
1233 }
1234
1235 #[tracing::instrument(name = "FaucetService::run", skip_all, fields(port = self.port, chain_id = ?self.chain_id))]
1237 pub async fn run(self, cancellation_token: CancellationToken) -> anyhow::Result<()> {
1238 let port = self.port;
1239 let index_handler = axum::routing::get(graphiql).post(Self::index_handler);
1240
1241 #[cfg(feature = "metrics")]
1242 monitoring_server::start_metrics_with_profiling(
1243 self.metrics_address(),
1244 cancellation_token.clone(),
1245 self.enable_memory_profiling,
1246 )
1247 .await;
1248
1249 let app = Router::new()
1250 .route("/", index_handler)
1251 .route("/ready", axum::routing::get(|| async { "ready!" }))
1252 .route_service("/ws", GraphQLSubscription::new(self.schema()))
1253 .layer(Extension(self.clone()))
1254 .layer(CorsLayer::permissive());
1255
1256 info!("GraphiQL IDE: http://localhost:{}", port);
1257
1258 let batch_processor_config = BatchProcessorConfig {
1260 end_timestamp: self.end_timestamp,
1261 start_timestamp: self.start_timestamp,
1262 start_balance: self.start_balance,
1263 max_batch_size: self.max_batch_size,
1264 };
1265 let mut batch_processor = BatchProcessor::new(
1266 batch_processor_config,
1267 Arc::clone(&self.context),
1268 self.client.clone(),
1269 Arc::clone(&self.faucet_storage),
1270 Arc::clone(&self.pending_requests),
1271 Arc::clone(&self.request_notifier),
1272 );
1273
1274 let chain_listener = ChainListener::new(
1275 self.config,
1276 self.context,
1277 self.storage,
1278 cancellation_token.clone(),
1279 tokio::sync::mpsc::unbounded_channel().1,
1280 false, )
1282 .run()
1283 .await?;
1284 let batch_processor_task = batch_processor.run(cancellation_token.clone());
1285 let tcp_listener =
1286 tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;
1287 let server = axum::serve(tcp_listener, app)
1288 .with_graceful_shutdown(cancellation_token.cancelled_owned())
1289 .into_future();
1290 futures::select! {
1291 result = Box::pin(chain_listener).fuse() => result?,
1292 _ = Box::pin(batch_processor_task).fuse() => {},
1293 result = Box::pin(server).fuse() => result?,
1294 };
1295
1296 Ok(())
1297 }
1298
1299 async fn index_handler(service: Extension<Self>, request: GraphQLRequest) -> GraphQLResponse {
1301 let schema = service.0.schema();
1302 schema.execute(request.into_inner()).await.into()
1303 }
1304}
1305
1306fn extract_opened_single_owner_chains(
1309 certificate: &ConfirmedBlockCertificate,
1310) -> anyhow::Result<Vec<(AccountOwner, ChainDescription)>> {
1311 let created_blobs = certificate.block().body.blobs.iter().flatten();
1312 created_blobs
1313 .filter_map(|blob| {
1314 if blob.content().blob_type() != BlobType::ChainDescription {
1315 return None;
1316 }
1317 let description = match bcs::from_bytes::<ChainDescription>(blob.bytes()) {
1318 Err(err) => return Some(Err(anyhow::anyhow!(err))),
1319 Ok(description) => description,
1320 };
1321 let chain_id = description.id();
1322 let owner = {
1323 let mut owners = description.config().ownership.all_owners();
1324 match (owners.next(), owners.next()) {
1325 (Some(owner), None) => *owner,
1326 (None, None) | (_, Some(_)) => {
1327 tracing::info!("Skipping chain {chain_id}; not exactly one owner.");
1328 return None;
1329 }
1330 }
1331 };
1332 tracing::debug!("Found chain {chain_id} created for owner {owner}",);
1333 Some(Ok((owner, description)))
1334 })
1335 .collect::<Result<Vec<_>, _>>()
1336}