safecoin_faucet/
faucet.rs

1//! The `faucet` module provides an object for launching a Safecoin Faucet,
2//! which is the custodian of any remaining lamports in a mint.
3//! The Safecoin Faucet builds and sends airdrop transactions,
4//! checking requests against a single-request cap and a per-IP limit
5//! for a given time time_slice.
6
7use {
8    bincode::{deserialize, serialize, serialized_size},
9    byteorder::{ByteOrder, LittleEndian},
10    crossbeam_channel::{unbounded, Sender},
11    log::*,
12    serde_derive::{Deserialize, Serialize},
13    solana_metrics::datapoint_info,
14    solana_sdk::{
15        hash::Hash,
16        instruction::Instruction,
17        message::Message,
18        native_token::lamports_to_sol,
19        packet::PACKET_DATA_SIZE,
20        pubkey::Pubkey,
21        signature::{Keypair, Signer},
22        system_instruction,
23        transaction::Transaction,
24    },
25    std::{
26        collections::{HashMap, HashSet},
27        io::{Read, Write},
28        net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
29        sync::{Arc, Mutex},
30        thread,
31        time::Duration,
32    },
33    thiserror::Error,
34    tokio::{
35        io::{AsyncReadExt, AsyncWriteExt},
36        net::{TcpListener, TcpStream as TokioTcpStream},
37        runtime::Runtime,
38    },
39};
40
41#[macro_export]
42macro_rules! socketaddr {
43    ($ip:expr, $port:expr) => {
44        SocketAddr::from((Ipv4Addr::from($ip), $port))
45    };
46    ($str:expr) => {{
47        let a: SocketAddr = $str.parse().unwrap();
48        a
49    }};
50}
51
52const ERROR_RESPONSE: [u8; 2] = 0u16.to_le_bytes();
53
54pub const TIME_SLICE: u64 = 60;
55pub const FAUCET_PORT: u16 = 9900;
56pub const FAUCET_PORT_STR: &str = "9900";
57
58#[derive(Error, Debug)]
59pub enum FaucetError {
60    #[error("IO Error: {0}")]
61    IoError(#[from] std::io::Error),
62
63    #[error("serialization error: {0}")]
64    Serialize(#[from] bincode::Error),
65
66    #[error("transaction_length from faucet exceeds limit: {0}")]
67    TransactionDataTooLarge(usize),
68
69    #[error("transaction_length from faucet: 0")]
70    NoDataReceived,
71
72    #[error("request too large; req: ◎{0}, cap: ◎{1}")]
73    PerRequestCapExceeded(f64, f64),
74
75    #[error("limit reached; req: ◎{0}, to: {1}, current: ◎{2}, cap: ◎{3}")]
76    PerTimeCapExceeded(f64, String, f64, f64),
77}
78
79#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
80pub enum FaucetRequest {
81    GetAirdrop {
82        lamports: u64,
83        to: Pubkey,
84        blockhash: Hash,
85    },
86}
87
88pub enum FaucetTransaction {
89    Airdrop(Transaction),
90    Memo((Transaction, String)),
91}
92
93pub struct Faucet {
94    faucet_keypair: Keypair,
95    ip_cache: HashMap<IpAddr, u64>,
96    address_cache: HashMap<Pubkey, u64>,
97    pub time_slice: Duration,
98    per_time_cap: Option<u64>,
99    per_request_cap: Option<u64>,
100    allowed_ips: HashSet<IpAddr>,
101}
102
103impl Faucet {
104    pub fn new(
105        faucet_keypair: Keypair,
106        time_input: Option<u64>,
107        per_time_cap: Option<u64>,
108        per_request_cap: Option<u64>,
109    ) -> Self {
110        Self::new_with_allowed_ips(
111            faucet_keypair,
112            time_input,
113            per_time_cap,
114            per_request_cap,
115            HashSet::new(),
116        )
117    }
118
119    pub fn new_with_allowed_ips(
120        faucet_keypair: Keypair,
121        time_input: Option<u64>,
122        per_time_cap: Option<u64>,
123        per_request_cap: Option<u64>,
124        allowed_ips: HashSet<IpAddr>,
125    ) -> Self {
126        let time_slice = Duration::new(time_input.unwrap_or(TIME_SLICE), 0);
127        if let Some((per_request_cap, per_time_cap)) = per_request_cap.zip(per_time_cap) {
128            if per_time_cap < per_request_cap {
129                warn!(
130                    "per_time_cap {} SAFE < per_request_cap {} SAFE; \
131                    maximum single requests will fail",
132                    lamports_to_sol(per_time_cap),
133                    lamports_to_sol(per_request_cap),
134                );
135            }
136        }
137        Self {
138            faucet_keypair,
139            ip_cache: HashMap::new(),
140            address_cache: HashMap::new(),
141            time_slice,
142            per_time_cap,
143            per_request_cap,
144            allowed_ips,
145        }
146    }
147
148    pub fn check_time_request_limit<T: LimitByTime + std::fmt::Display>(
149        &mut self,
150        request_amount: u64,
151        to: T,
152    ) -> Result<(), FaucetError> {
153        let new_total = to.check_cache(self, request_amount);
154        to.datapoint_info(request_amount, new_total);
155        if let Some(cap) = self.per_time_cap {
156            if new_total > cap {
157                return Err(FaucetError::PerTimeCapExceeded(
158                    lamports_to_sol(request_amount),
159                    to.to_string(),
160                    lamports_to_sol(new_total),
161                    lamports_to_sol(cap),
162                ));
163            }
164        }
165        Ok(())
166    }
167
168    pub fn clear_caches(&mut self) {
169        self.ip_cache.clear();
170        self.address_cache.clear();
171    }
172
173    /// Checks per-request and per-time-ip limits; if both pass, this method returns a signed
174    /// SystemProgram::Transfer transaction from the faucet keypair to the requested recipient. If
175    /// the request exceeds this per-request limit, this method returns a signed SPL Memo
176    /// transaction with the memo: "request too large; req: <REQUEST> SAFE cap: <CAP> SAFE"
177    pub fn build_airdrop_transaction(
178        &mut self,
179        req: FaucetRequest,
180        ip: IpAddr,
181    ) -> Result<FaucetTransaction, FaucetError> {
182        trace!("build_airdrop_transaction: {:?}", req);
183        match req {
184            FaucetRequest::GetAirdrop {
185                lamports,
186                to,
187                blockhash,
188            } => {
189                let mint_pubkey = self.faucet_keypair.pubkey();
190                info!(
191                    "Requesting airdrop of {} SAFE to {:?}",
192                    lamports_to_sol(lamports),
193                    to
194                );
195
196                if let Some(cap) = self.per_request_cap {
197                    if lamports > cap {
198                        let memo = format!(
199                            "{}",
200                            FaucetError::PerRequestCapExceeded(
201                                lamports_to_sol(lamports),
202                                lamports_to_sol(cap),
203                            )
204                        );
205                        let memo_instruction = Instruction {
206                            program_id: Pubkey::from(safe_memo::id().to_bytes()),
207                            accounts: vec![],
208                            data: memo.as_bytes().to_vec(),
209                        };
210                        let message = Message::new(&[memo_instruction], Some(&mint_pubkey));
211                        return Ok(FaucetTransaction::Memo((
212                            Transaction::new(&[&self.faucet_keypair], message, blockhash),
213                            memo,
214                        )));
215                    }
216                }
217                if !ip.is_loopback() && !self.allowed_ips.contains(&ip) {
218                    self.check_time_request_limit(lamports, ip)?;
219                }
220                self.check_time_request_limit(lamports, to)?;
221
222                let transfer_instruction =
223                    system_instruction::transfer(&mint_pubkey, &to, lamports);
224                let message = Message::new(&[transfer_instruction], Some(&mint_pubkey));
225                Ok(FaucetTransaction::Airdrop(Transaction::new(
226                    &[&self.faucet_keypair],
227                    message,
228                    blockhash,
229                )))
230            }
231        }
232    }
233
234    /// Deserializes a received airdrop request, and returns a serialized transaction
235    pub fn process_faucet_request(
236        &mut self,
237        bytes: &[u8],
238        ip: IpAddr,
239    ) -> Result<Vec<u8>, FaucetError> {
240        let req: FaucetRequest = deserialize(bytes)?;
241
242        info!("Airdrop transaction requested...{:?}", req);
243        let res = self.build_airdrop_transaction(req, ip);
244        match res {
245            Ok(tx) => {
246                let tx = match tx {
247                    FaucetTransaction::Airdrop(tx) => {
248                        info!("Airdrop transaction granted");
249                        tx
250                    }
251                    FaucetTransaction::Memo((tx, memo)) => {
252                        warn!("Memo transaction returned: {}", memo);
253                        tx
254                    }
255                };
256                let response_vec = bincode::serialize(&tx)?;
257
258                let mut response_vec_with_length = vec![0; 2];
259                LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
260                response_vec_with_length.extend_from_slice(&response_vec);
261
262                Ok(response_vec_with_length)
263            }
264            Err(err) => {
265                warn!("Airdrop transaction failed: {}", err);
266                Err(err)
267            }
268        }
269    }
270}
271
272impl Drop for Faucet {
273    fn drop(&mut self) {
274        solana_metrics::flush();
275    }
276}
277
278pub fn request_airdrop_transaction(
279    faucet_addr: &SocketAddr,
280    id: &Pubkey,
281    lamports: u64,
282    blockhash: Hash,
283) -> Result<Transaction, FaucetError> {
284    info!(
285        "request_airdrop_transaction: faucet_addr={} id={} lamports={} blockhash={}",
286        faucet_addr, id, lamports, blockhash
287    );
288
289    let mut stream = TcpStream::connect_timeout(faucet_addr, Duration::new(3, 0))?;
290    stream.set_read_timeout(Some(Duration::new(10, 0)))?;
291    let req = FaucetRequest::GetAirdrop {
292        lamports,
293        blockhash,
294        to: *id,
295    };
296    let req = serialize(&req).expect("serialize faucet request");
297    stream.write_all(&req)?;
298
299    // Read length of transaction
300    let mut buffer = [0; 2];
301    stream.read_exact(&mut buffer).map_err(|err| {
302        info!(
303            "request_airdrop_transaction: buffer length read_exact error: {:?}",
304            err
305        );
306        err
307    })?;
308    let transaction_length = LittleEndian::read_u16(&buffer) as usize;
309    if transaction_length > PACKET_DATA_SIZE {
310        return Err(FaucetError::TransactionDataTooLarge(transaction_length));
311    } else if transaction_length == 0 {
312        return Err(FaucetError::NoDataReceived);
313    }
314
315    // Read the transaction
316    let mut buffer = Vec::new();
317    buffer.resize(transaction_length, 0);
318    stream.read_exact(&mut buffer).map_err(|err| {
319        info!(
320            "request_airdrop_transaction: buffer read_exact error: {:?}",
321            err
322        );
323        err
324    })?;
325
326    let transaction: Transaction = deserialize(&buffer)?;
327    Ok(transaction)
328}
329
330pub fn run_local_faucet_with_port(
331    faucet_keypair: Keypair,
332    sender: Sender<Result<SocketAddr, String>>,
333    per_time_cap: Option<u64>,
334    port: u16, // 0 => auto assign
335) {
336    thread::spawn(move || {
337        let faucet_addr = socketaddr!(0, port);
338        let faucet = Arc::new(Mutex::new(Faucet::new(
339            faucet_keypair,
340            None,
341            per_time_cap,
342            None,
343        )));
344        let runtime = Runtime::new().unwrap();
345        runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
346    });
347}
348
349// For integration tests. Listens on random open port and reports port to Sender.
350pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option<u64>) -> SocketAddr {
351    let (sender, receiver) = unbounded();
352    run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0);
353    receiver
354        .recv()
355        .expect("run_local_faucet")
356        .expect("faucet_addr")
357}
358
359pub async fn run_faucet(
360    faucet: Arc<Mutex<Faucet>>,
361    faucet_addr: SocketAddr,
362    sender: Option<Sender<Result<SocketAddr, String>>>,
363) {
364    let listener = TcpListener::bind(&faucet_addr).await;
365    if let Some(sender) = sender {
366        sender.send(
367            listener.as_ref().map(|listener| listener.local_addr().unwrap())
368                .map_err(|err| {
369                    format!(
370                        "Unable to bind faucet to {:?}, check the address is not already in use: {}",
371                        faucet_addr, err
372                    )
373                })
374            )
375            .unwrap();
376    }
377
378    let listener = match listener {
379        Err(err) => {
380            error!("Faucet failed to start: {}", err);
381            return;
382        }
383        Ok(listener) => listener,
384    };
385    info!("Faucet started. Listening on: {}", faucet_addr);
386    info!(
387        "Faucet account address: {}",
388        faucet.lock().unwrap().faucet_keypair.pubkey()
389    );
390
391    loop {
392        let _faucet = faucet.clone();
393        match listener.accept().await {
394            Ok((stream, _)) => {
395                tokio::spawn(async move {
396                    if let Err(e) = process(stream, _faucet).await {
397                        info!("failed to process request; error = {:?}", e);
398                    }
399                });
400            }
401            Err(e) => debug!("failed to accept socket; error = {:?}", e),
402        }
403    }
404}
405
406async fn process(
407    mut stream: TokioTcpStream,
408    faucet: Arc<Mutex<Faucet>>,
409) -> Result<(), Box<dyn std::error::Error>> {
410    let mut request = vec![
411        0u8;
412        serialized_size(&FaucetRequest::GetAirdrop {
413            lamports: u64::default(),
414            to: Pubkey::default(),
415            blockhash: Hash::default(),
416        })
417        .unwrap() as usize
418    ];
419    while stream.read_exact(&mut request).await.is_ok() {
420        trace!("{:?}", request);
421
422        let response = {
423            match stream.peer_addr() {
424                Err(e) => {
425                    info!("{:?}", e.into_inner());
426                    ERROR_RESPONSE.to_vec()
427                }
428                Ok(peer_addr) => {
429                    let ip = peer_addr.ip();
430                    info!("Request IP: {:?}", ip);
431
432                    match faucet.lock().unwrap().process_faucet_request(&request, ip) {
433                        Ok(response_bytes) => {
434                            trace!("Airdrop response_bytes: {:?}", response_bytes);
435                            response_bytes
436                        }
437                        Err(e) => {
438                            info!("Error in request: {}", e);
439                            ERROR_RESPONSE.to_vec()
440                        }
441                    }
442                }
443            }
444        };
445        stream.write_all(&response).await?;
446    }
447
448    Ok(())
449}
450
451pub trait LimitByTime {
452    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64;
453    fn datapoint_info(&self, request_amount: u64, new_total: u64);
454}
455
456impl LimitByTime for IpAddr {
457    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
458        *faucet
459            .ip_cache
460            .entry(*self)
461            .and_modify(|total| *total = total.saturating_add(request_amount))
462            .or_insert(request_amount)
463    }
464
465    fn datapoint_info(&self, request_amount: u64, new_total: u64) {
466        datapoint_info!(
467            "faucet-airdrop",
468            ("request_amount", request_amount, i64),
469            ("ip", self.to_string(), String),
470            ("new_total", new_total, i64)
471        );
472    }
473}
474
475impl LimitByTime for Pubkey {
476    fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
477        *faucet
478            .address_cache
479            .entry(*self)
480            .and_modify(|total| *total = total.saturating_add(request_amount))
481            .or_insert(request_amount)
482    }
483
484    fn datapoint_info(&self, request_amount: u64, new_total: u64) {
485        datapoint_info!(
486            "faucet-airdrop",
487            ("request_amount", request_amount, i64),
488            ("address", self.to_string(), String),
489            ("new_total", new_total, i64)
490        );
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use {super::*, solana_sdk::system_instruction::SystemInstruction, std::time::Duration};
497
498    #[test]
499    fn test_check_time_request_limit() {
500        let keypair = Keypair::new();
501        let mut faucet = Faucet::new(keypair, None, Some(2), None);
502        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
503        assert!(faucet.check_time_request_limit(1, ip).is_ok());
504        assert!(faucet.check_time_request_limit(1, ip).is_ok());
505        assert!(faucet.check_time_request_limit(1, ip).is_err());
506
507        let address = Pubkey::new_unique();
508        assert!(faucet.check_time_request_limit(1, address).is_ok());
509        assert!(faucet.check_time_request_limit(1, address).is_ok());
510        assert!(faucet.check_time_request_limit(1, address).is_err());
511    }
512
513    #[test]
514    fn test_clear_caches() {
515        let keypair = Keypair::new();
516        let mut faucet = Faucet::new(keypair, None, None, None);
517        let ip = socketaddr!([127, 0, 0, 1], 0).ip();
518        assert_eq!(faucet.ip_cache.len(), 0);
519        faucet.check_time_request_limit(1, ip).unwrap();
520        assert_eq!(faucet.ip_cache.len(), 1);
521        faucet.clear_caches();
522        assert_eq!(faucet.ip_cache.len(), 0);
523        assert!(faucet.ip_cache.is_empty());
524
525        let address = Pubkey::new_unique();
526        assert_eq!(faucet.address_cache.len(), 0);
527        faucet.check_time_request_limit(1, address).unwrap();
528        assert_eq!(faucet.address_cache.len(), 1);
529        faucet.clear_caches();
530        assert_eq!(faucet.address_cache.len(), 0);
531        assert!(faucet.address_cache.is_empty());
532    }
533
534    #[test]
535    fn test_faucet_default_init() {
536        let keypair = Keypair::new();
537        let time_slice: Option<u64> = None;
538        let per_time_cap: Option<u64> = Some(200);
539        let per_request_cap: Option<u64> = Some(100);
540        let faucet = Faucet::new(keypair, time_slice, per_time_cap, per_request_cap);
541        assert_eq!(faucet.time_slice, Duration::new(TIME_SLICE, 0));
542        assert_eq!(faucet.per_time_cap, per_time_cap);
543        assert_eq!(faucet.per_request_cap, per_request_cap);
544    }
545
546    #[test]
547    fn test_faucet_build_airdrop_transaction() {
548        let to = Pubkey::new_unique();
549        let blockhash = Hash::default();
550        let request = FaucetRequest::GetAirdrop {
551            lamports: 2,
552            to,
553            blockhash,
554        };
555        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
556
557        let mint = Keypair::new();
558        let mint_pubkey = mint.pubkey();
559        let mut faucet = Faucet::new(mint, None, None, None);
560
561        if let FaucetTransaction::Airdrop(tx) =
562            faucet.build_airdrop_transaction(request, ip).unwrap()
563        {
564            let message = tx.message();
565
566            assert_eq!(tx.signatures.len(), 1);
567            assert_eq!(
568                message.account_keys,
569                vec![mint_pubkey, to, Pubkey::default()]
570            );
571            assert_eq!(message.recent_blockhash, blockhash);
572
573            assert_eq!(message.instructions.len(), 1);
574            let instruction: SystemInstruction =
575                deserialize(&message.instructions[0].data).unwrap();
576            assert_eq!(instruction, SystemInstruction::Transfer { lamports: 2 });
577        } else {
578            panic!("airdrop should succeed");
579        }
580
581        // Test per-time request cap
582        let mint = Keypair::new();
583        faucet = Faucet::new(mint, None, Some(2), None);
584        let _tx = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
585        let tx = faucet.build_airdrop_transaction(request, ip);
586        assert!(tx.is_err());
587
588        // Test multiple requests from loopback with different addresses succeed
589        let mint = Keypair::new();
590        faucet = Faucet::new(mint, None, Some(2), None);
591        let ip = socketaddr!([127, 0, 0, 1], 0).ip();
592        let other = Pubkey::new_unique();
593        let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
594        let request1 = FaucetRequest::GetAirdrop {
595            lamports: 2,
596            to: other,
597            blockhash,
598        };
599        let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); // first request succeeds
600        let tx0 = faucet.build_airdrop_transaction(request, ip);
601        assert!(tx0.is_err());
602        let tx1 = faucet.build_airdrop_transaction(request1, ip);
603        assert!(tx1.is_err());
604
605        // Test multiple requests from allowed ip with different addresses succeed
606        let mint = Keypair::new();
607        let ip = socketaddr!([203, 0, 113, 1], 0).ip();
608        let mut allowed_ips = HashSet::new();
609        allowed_ips.insert(ip);
610        faucet = Faucet::new_with_allowed_ips(mint, None, Some(2), None, allowed_ips);
611        let other = Pubkey::new_unique();
612        let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); // first request succeeds
613        let request1 = FaucetRequest::GetAirdrop {
614            lamports: 2,
615            to: other,
616            blockhash,
617        };
618        let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); // first request succeeds
619        let tx0 = faucet.build_airdrop_transaction(request, ip);
620        assert!(tx0.is_err());
621        let tx1 = faucet.build_airdrop_transaction(request1, ip);
622        assert!(tx1.is_err());
623
624        // Test per-request cap
625        let mint = Keypair::new();
626        let mint_pubkey = mint.pubkey();
627        let mut faucet = Faucet::new(mint, None, None, Some(1));
628
629        if let FaucetTransaction::Memo((tx, memo)) =
630            faucet.build_airdrop_transaction(request, ip).unwrap()
631        {
632            let message = tx.message();
633
634            assert_eq!(tx.signatures.len(), 1);
635            assert_eq!(
636                message.account_keys,
637                vec![mint_pubkey, Pubkey::from(safe_memo::id().to_bytes())]
638            );
639            assert_eq!(message.recent_blockhash, blockhash);
640
641            assert_eq!(message.instructions.len(), 1);
642            let parsed_memo = std::str::from_utf8(&message.instructions[0].data).unwrap();
643            let expected_memo = "request too large; req: ◎0.000000002, cap: ◎0.000000001";
644            assert_eq!(parsed_memo, expected_memo);
645            assert_eq!(memo, expected_memo);
646        } else {
647            panic!("airdrop attempt should result in memo tx");
648        }
649    }
650
651    #[test]
652    fn test_process_faucet_request() {
653        let to = solana_sdk::pubkey::new_rand();
654        let blockhash = Hash::new(to.as_ref());
655        let lamports = 50;
656        let req = FaucetRequest::GetAirdrop {
657            lamports,
658            blockhash,
659            to,
660        };
661        let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
662        let req = serialize(&req).unwrap();
663
664        let keypair = Keypair::new();
665        let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
666        let message = Message::new(&[expected_instruction], Some(&keypair.pubkey()));
667        let expected_tx = Transaction::new(&[&keypair], message, blockhash);
668        let expected_bytes = serialize(&expected_tx).unwrap();
669        let mut expected_vec_with_length = vec![0; 2];
670        LittleEndian::write_u16(&mut expected_vec_with_length, expected_bytes.len() as u16);
671        expected_vec_with_length.extend_from_slice(&expected_bytes);
672
673        let mut faucet = Faucet::new(keypair, None, None, None);
674        let response = faucet.process_faucet_request(&req, ip);
675        let response_vec = response.unwrap().to_vec();
676        assert_eq!(expected_vec_with_length, response_vec);
677
678        let bad_bytes = "bad bytes".as_bytes();
679        assert!(faucet.process_faucet_request(bad_bytes, ip).is_err());
680    }
681}