1use {
8 bincode::{deserialize, serialize, serialized_size},
9 byteorder::{ByteOrder, LittleEndian},
10 crossbeam_channel::{unbounded, Sender},
11 log::*,
12 serde_derive::{Deserialize, Serialize},
13 solana_metrics::datapoint_info,
14 solana_sdk::{
15 hash::Hash,
16 instruction::Instruction,
17 message::Message,
18 native_token::lamports_to_sol,
19 packet::PACKET_DATA_SIZE,
20 pubkey::Pubkey,
21 signature::{Keypair, Signer},
22 system_instruction,
23 transaction::Transaction,
24 },
25 std::{
26 collections::{HashMap, HashSet},
27 io::{Read, Write},
28 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
29 sync::{Arc, Mutex},
30 thread,
31 time::Duration,
32 },
33 thiserror::Error,
34 tokio::{
35 io::{AsyncReadExt, AsyncWriteExt},
36 net::{TcpListener, TcpStream as TokioTcpStream},
37 runtime::Runtime,
38 },
39};
40
41#[macro_export]
42macro_rules! socketaddr {
43 ($ip:expr, $port:expr) => {
44 SocketAddr::from((Ipv4Addr::from($ip), $port))
45 };
46 ($str:expr) => {{
47 let a: SocketAddr = $str.parse().unwrap();
48 a
49 }};
50}
51
52const ERROR_RESPONSE: [u8; 2] = 0u16.to_le_bytes();
53
54pub const TIME_SLICE: u64 = 60;
55pub const FAUCET_PORT: u16 = 9900;
56pub const FAUCET_PORT_STR: &str = "9900";
57
58#[derive(Error, Debug)]
59pub enum FaucetError {
60 #[error("IO Error: {0}")]
61 IoError(#[from] std::io::Error),
62
63 #[error("serialization error: {0}")]
64 Serialize(#[from] bincode::Error),
65
66 #[error("transaction_length from faucet exceeds limit: {0}")]
67 TransactionDataTooLarge(usize),
68
69 #[error("transaction_length from faucet: 0")]
70 NoDataReceived,
71
72 #[error("request too large; req: ◎{0}, cap: ◎{1}")]
73 PerRequestCapExceeded(f64, f64),
74
75 #[error("limit reached; req: ◎{0}, to: {1}, current: ◎{2}, cap: ◎{3}")]
76 PerTimeCapExceeded(f64, String, f64, f64),
77}
78
79#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
80pub enum FaucetRequest {
81 GetAirdrop {
82 lamports: u64,
83 to: Pubkey,
84 blockhash: Hash,
85 },
86}
87
88pub enum FaucetTransaction {
89 Airdrop(Transaction),
90 Memo((Transaction, String)),
91}
92
93pub struct Faucet {
94 faucet_keypair: Keypair,
95 ip_cache: HashMap<IpAddr, u64>,
96 address_cache: HashMap<Pubkey, u64>,
97 pub time_slice: Duration,
98 per_time_cap: Option<u64>,
99 per_request_cap: Option<u64>,
100 allowed_ips: HashSet<IpAddr>,
101}
102
103impl Faucet {
104 pub fn new(
105 faucet_keypair: Keypair,
106 time_input: Option<u64>,
107 per_time_cap: Option<u64>,
108 per_request_cap: Option<u64>,
109 ) -> Self {
110 Self::new_with_allowed_ips(
111 faucet_keypair,
112 time_input,
113 per_time_cap,
114 per_request_cap,
115 HashSet::new(),
116 )
117 }
118
119 pub fn new_with_allowed_ips(
120 faucet_keypair: Keypair,
121 time_input: Option<u64>,
122 per_time_cap: Option<u64>,
123 per_request_cap: Option<u64>,
124 allowed_ips: HashSet<IpAddr>,
125 ) -> Self {
126 let time_slice = Duration::new(time_input.unwrap_or(TIME_SLICE), 0);
127 if let Some((per_request_cap, per_time_cap)) = per_request_cap.zip(per_time_cap) {
128 if per_time_cap < per_request_cap {
129 warn!(
130 "per_time_cap {} SAFE < per_request_cap {} SAFE; \
131 maximum single requests will fail",
132 lamports_to_sol(per_time_cap),
133 lamports_to_sol(per_request_cap),
134 );
135 }
136 }
137 Self {
138 faucet_keypair,
139 ip_cache: HashMap::new(),
140 address_cache: HashMap::new(),
141 time_slice,
142 per_time_cap,
143 per_request_cap,
144 allowed_ips,
145 }
146 }
147
148 pub fn check_time_request_limit<T: LimitByTime + std::fmt::Display>(
149 &mut self,
150 request_amount: u64,
151 to: T,
152 ) -> Result<(), FaucetError> {
153 let new_total = to.check_cache(self, request_amount);
154 to.datapoint_info(request_amount, new_total);
155 if let Some(cap) = self.per_time_cap {
156 if new_total > cap {
157 return Err(FaucetError::PerTimeCapExceeded(
158 lamports_to_sol(request_amount),
159 to.to_string(),
160 lamports_to_sol(new_total),
161 lamports_to_sol(cap),
162 ));
163 }
164 }
165 Ok(())
166 }
167
168 pub fn clear_caches(&mut self) {
169 self.ip_cache.clear();
170 self.address_cache.clear();
171 }
172
173 pub fn build_airdrop_transaction(
178 &mut self,
179 req: FaucetRequest,
180 ip: IpAddr,
181 ) -> Result<FaucetTransaction, FaucetError> {
182 trace!("build_airdrop_transaction: {:?}", req);
183 match req {
184 FaucetRequest::GetAirdrop {
185 lamports,
186 to,
187 blockhash,
188 } => {
189 let mint_pubkey = self.faucet_keypair.pubkey();
190 info!(
191 "Requesting airdrop of {} SAFE to {:?}",
192 lamports_to_sol(lamports),
193 to
194 );
195
196 if let Some(cap) = self.per_request_cap {
197 if lamports > cap {
198 let memo = format!(
199 "{}",
200 FaucetError::PerRequestCapExceeded(
201 lamports_to_sol(lamports),
202 lamports_to_sol(cap),
203 )
204 );
205 let memo_instruction = Instruction {
206 program_id: Pubkey::from(safe_memo::id().to_bytes()),
207 accounts: vec![],
208 data: memo.as_bytes().to_vec(),
209 };
210 let message = Message::new(&[memo_instruction], Some(&mint_pubkey));
211 return Ok(FaucetTransaction::Memo((
212 Transaction::new(&[&self.faucet_keypair], message, blockhash),
213 memo,
214 )));
215 }
216 }
217 if !ip.is_loopback() && !self.allowed_ips.contains(&ip) {
218 self.check_time_request_limit(lamports, ip)?;
219 }
220 self.check_time_request_limit(lamports, to)?;
221
222 let transfer_instruction =
223 system_instruction::transfer(&mint_pubkey, &to, lamports);
224 let message = Message::new(&[transfer_instruction], Some(&mint_pubkey));
225 Ok(FaucetTransaction::Airdrop(Transaction::new(
226 &[&self.faucet_keypair],
227 message,
228 blockhash,
229 )))
230 }
231 }
232 }
233
234 pub fn process_faucet_request(
236 &mut self,
237 bytes: &[u8],
238 ip: IpAddr,
239 ) -> Result<Vec<u8>, FaucetError> {
240 let req: FaucetRequest = deserialize(bytes)?;
241
242 info!("Airdrop transaction requested...{:?}", req);
243 let res = self.build_airdrop_transaction(req, ip);
244 match res {
245 Ok(tx) => {
246 let tx = match tx {
247 FaucetTransaction::Airdrop(tx) => {
248 info!("Airdrop transaction granted");
249 tx
250 }
251 FaucetTransaction::Memo((tx, memo)) => {
252 warn!("Memo transaction returned: {}", memo);
253 tx
254 }
255 };
256 let response_vec = bincode::serialize(&tx)?;
257
258 let mut response_vec_with_length = vec![0; 2];
259 LittleEndian::write_u16(&mut response_vec_with_length, response_vec.len() as u16);
260 response_vec_with_length.extend_from_slice(&response_vec);
261
262 Ok(response_vec_with_length)
263 }
264 Err(err) => {
265 warn!("Airdrop transaction failed: {}", err);
266 Err(err)
267 }
268 }
269 }
270}
271
272impl Drop for Faucet {
273 fn drop(&mut self) {
274 solana_metrics::flush();
275 }
276}
277
278pub fn request_airdrop_transaction(
279 faucet_addr: &SocketAddr,
280 id: &Pubkey,
281 lamports: u64,
282 blockhash: Hash,
283) -> Result<Transaction, FaucetError> {
284 info!(
285 "request_airdrop_transaction: faucet_addr={} id={} lamports={} blockhash={}",
286 faucet_addr, id, lamports, blockhash
287 );
288
289 let mut stream = TcpStream::connect_timeout(faucet_addr, Duration::new(3, 0))?;
290 stream.set_read_timeout(Some(Duration::new(10, 0)))?;
291 let req = FaucetRequest::GetAirdrop {
292 lamports,
293 blockhash,
294 to: *id,
295 };
296 let req = serialize(&req).expect("serialize faucet request");
297 stream.write_all(&req)?;
298
299 let mut buffer = [0; 2];
301 stream.read_exact(&mut buffer).map_err(|err| {
302 info!(
303 "request_airdrop_transaction: buffer length read_exact error: {:?}",
304 err
305 );
306 err
307 })?;
308 let transaction_length = LittleEndian::read_u16(&buffer) as usize;
309 if transaction_length > PACKET_DATA_SIZE {
310 return Err(FaucetError::TransactionDataTooLarge(transaction_length));
311 } else if transaction_length == 0 {
312 return Err(FaucetError::NoDataReceived);
313 }
314
315 let mut buffer = Vec::new();
317 buffer.resize(transaction_length, 0);
318 stream.read_exact(&mut buffer).map_err(|err| {
319 info!(
320 "request_airdrop_transaction: buffer read_exact error: {:?}",
321 err
322 );
323 err
324 })?;
325
326 let transaction: Transaction = deserialize(&buffer)?;
327 Ok(transaction)
328}
329
330pub fn run_local_faucet_with_port(
331 faucet_keypair: Keypair,
332 sender: Sender<Result<SocketAddr, String>>,
333 per_time_cap: Option<u64>,
334 port: u16, ) {
336 thread::spawn(move || {
337 let faucet_addr = socketaddr!(0, port);
338 let faucet = Arc::new(Mutex::new(Faucet::new(
339 faucet_keypair,
340 None,
341 per_time_cap,
342 None,
343 )));
344 let runtime = Runtime::new().unwrap();
345 runtime.block_on(run_faucet(faucet, faucet_addr, Some(sender)));
346 });
347}
348
349pub fn run_local_faucet(faucet_keypair: Keypair, per_time_cap: Option<u64>) -> SocketAddr {
351 let (sender, receiver) = unbounded();
352 run_local_faucet_with_port(faucet_keypair, sender, per_time_cap, 0);
353 receiver
354 .recv()
355 .expect("run_local_faucet")
356 .expect("faucet_addr")
357}
358
359pub async fn run_faucet(
360 faucet: Arc<Mutex<Faucet>>,
361 faucet_addr: SocketAddr,
362 sender: Option<Sender<Result<SocketAddr, String>>>,
363) {
364 let listener = TcpListener::bind(&faucet_addr).await;
365 if let Some(sender) = sender {
366 sender.send(
367 listener.as_ref().map(|listener| listener.local_addr().unwrap())
368 .map_err(|err| {
369 format!(
370 "Unable to bind faucet to {:?}, check the address is not already in use: {}",
371 faucet_addr, err
372 )
373 })
374 )
375 .unwrap();
376 }
377
378 let listener = match listener {
379 Err(err) => {
380 error!("Faucet failed to start: {}", err);
381 return;
382 }
383 Ok(listener) => listener,
384 };
385 info!("Faucet started. Listening on: {}", faucet_addr);
386 info!(
387 "Faucet account address: {}",
388 faucet.lock().unwrap().faucet_keypair.pubkey()
389 );
390
391 loop {
392 let _faucet = faucet.clone();
393 match listener.accept().await {
394 Ok((stream, _)) => {
395 tokio::spawn(async move {
396 if let Err(e) = process(stream, _faucet).await {
397 info!("failed to process request; error = {:?}", e);
398 }
399 });
400 }
401 Err(e) => debug!("failed to accept socket; error = {:?}", e),
402 }
403 }
404}
405
406async fn process(
407 mut stream: TokioTcpStream,
408 faucet: Arc<Mutex<Faucet>>,
409) -> Result<(), Box<dyn std::error::Error>> {
410 let mut request = vec![
411 0u8;
412 serialized_size(&FaucetRequest::GetAirdrop {
413 lamports: u64::default(),
414 to: Pubkey::default(),
415 blockhash: Hash::default(),
416 })
417 .unwrap() as usize
418 ];
419 while stream.read_exact(&mut request).await.is_ok() {
420 trace!("{:?}", request);
421
422 let response = {
423 match stream.peer_addr() {
424 Err(e) => {
425 info!("{:?}", e.into_inner());
426 ERROR_RESPONSE.to_vec()
427 }
428 Ok(peer_addr) => {
429 let ip = peer_addr.ip();
430 info!("Request IP: {:?}", ip);
431
432 match faucet.lock().unwrap().process_faucet_request(&request, ip) {
433 Ok(response_bytes) => {
434 trace!("Airdrop response_bytes: {:?}", response_bytes);
435 response_bytes
436 }
437 Err(e) => {
438 info!("Error in request: {}", e);
439 ERROR_RESPONSE.to_vec()
440 }
441 }
442 }
443 }
444 };
445 stream.write_all(&response).await?;
446 }
447
448 Ok(())
449}
450
451pub trait LimitByTime {
452 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64;
453 fn datapoint_info(&self, request_amount: u64, new_total: u64);
454}
455
456impl LimitByTime for IpAddr {
457 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
458 *faucet
459 .ip_cache
460 .entry(*self)
461 .and_modify(|total| *total = total.saturating_add(request_amount))
462 .or_insert(request_amount)
463 }
464
465 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
466 datapoint_info!(
467 "faucet-airdrop",
468 ("request_amount", request_amount, i64),
469 ("ip", self.to_string(), String),
470 ("new_total", new_total, i64)
471 );
472 }
473}
474
475impl LimitByTime for Pubkey {
476 fn check_cache(&self, faucet: &mut Faucet, request_amount: u64) -> u64 {
477 *faucet
478 .address_cache
479 .entry(*self)
480 .and_modify(|total| *total = total.saturating_add(request_amount))
481 .or_insert(request_amount)
482 }
483
484 fn datapoint_info(&self, request_amount: u64, new_total: u64) {
485 datapoint_info!(
486 "faucet-airdrop",
487 ("request_amount", request_amount, i64),
488 ("address", self.to_string(), String),
489 ("new_total", new_total, i64)
490 );
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use {super::*, solana_sdk::system_instruction::SystemInstruction, std::time::Duration};
497
498 #[test]
499 fn test_check_time_request_limit() {
500 let keypair = Keypair::new();
501 let mut faucet = Faucet::new(keypair, None, Some(2), None);
502 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
503 assert!(faucet.check_time_request_limit(1, ip).is_ok());
504 assert!(faucet.check_time_request_limit(1, ip).is_ok());
505 assert!(faucet.check_time_request_limit(1, ip).is_err());
506
507 let address = Pubkey::new_unique();
508 assert!(faucet.check_time_request_limit(1, address).is_ok());
509 assert!(faucet.check_time_request_limit(1, address).is_ok());
510 assert!(faucet.check_time_request_limit(1, address).is_err());
511 }
512
513 #[test]
514 fn test_clear_caches() {
515 let keypair = Keypair::new();
516 let mut faucet = Faucet::new(keypair, None, None, None);
517 let ip = socketaddr!([127, 0, 0, 1], 0).ip();
518 assert_eq!(faucet.ip_cache.len(), 0);
519 faucet.check_time_request_limit(1, ip).unwrap();
520 assert_eq!(faucet.ip_cache.len(), 1);
521 faucet.clear_caches();
522 assert_eq!(faucet.ip_cache.len(), 0);
523 assert!(faucet.ip_cache.is_empty());
524
525 let address = Pubkey::new_unique();
526 assert_eq!(faucet.address_cache.len(), 0);
527 faucet.check_time_request_limit(1, address).unwrap();
528 assert_eq!(faucet.address_cache.len(), 1);
529 faucet.clear_caches();
530 assert_eq!(faucet.address_cache.len(), 0);
531 assert!(faucet.address_cache.is_empty());
532 }
533
534 #[test]
535 fn test_faucet_default_init() {
536 let keypair = Keypair::new();
537 let time_slice: Option<u64> = None;
538 let per_time_cap: Option<u64> = Some(200);
539 let per_request_cap: Option<u64> = Some(100);
540 let faucet = Faucet::new(keypair, time_slice, per_time_cap, per_request_cap);
541 assert_eq!(faucet.time_slice, Duration::new(TIME_SLICE, 0));
542 assert_eq!(faucet.per_time_cap, per_time_cap);
543 assert_eq!(faucet.per_request_cap, per_request_cap);
544 }
545
546 #[test]
547 fn test_faucet_build_airdrop_transaction() {
548 let to = Pubkey::new_unique();
549 let blockhash = Hash::default();
550 let request = FaucetRequest::GetAirdrop {
551 lamports: 2,
552 to,
553 blockhash,
554 };
555 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
556
557 let mint = Keypair::new();
558 let mint_pubkey = mint.pubkey();
559 let mut faucet = Faucet::new(mint, None, None, None);
560
561 if let FaucetTransaction::Airdrop(tx) =
562 faucet.build_airdrop_transaction(request, ip).unwrap()
563 {
564 let message = tx.message();
565
566 assert_eq!(tx.signatures.len(), 1);
567 assert_eq!(
568 message.account_keys,
569 vec![mint_pubkey, to, Pubkey::default()]
570 );
571 assert_eq!(message.recent_blockhash, blockhash);
572
573 assert_eq!(message.instructions.len(), 1);
574 let instruction: SystemInstruction =
575 deserialize(&message.instructions[0].data).unwrap();
576 assert_eq!(instruction, SystemInstruction::Transfer { lamports: 2 });
577 } else {
578 panic!("airdrop should succeed");
579 }
580
581 let mint = Keypair::new();
583 faucet = Faucet::new(mint, None, Some(2), None);
584 let _tx = faucet.build_airdrop_transaction(request, ip).unwrap(); let tx = faucet.build_airdrop_transaction(request, ip);
586 assert!(tx.is_err());
587
588 let mint = Keypair::new();
590 faucet = Faucet::new(mint, None, Some(2), None);
591 let ip = socketaddr!([127, 0, 0, 1], 0).ip();
592 let other = Pubkey::new_unique();
593 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
595 lamports: 2,
596 to: other,
597 blockhash,
598 };
599 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
601 assert!(tx0.is_err());
602 let tx1 = faucet.build_airdrop_transaction(request1, ip);
603 assert!(tx1.is_err());
604
605 let mint = Keypair::new();
607 let ip = socketaddr!([203, 0, 113, 1], 0).ip();
608 let mut allowed_ips = HashSet::new();
609 allowed_ips.insert(ip);
610 faucet = Faucet::new_with_allowed_ips(mint, None, Some(2), None, allowed_ips);
611 let other = Pubkey::new_unique();
612 let _tx0 = faucet.build_airdrop_transaction(request, ip).unwrap(); let request1 = FaucetRequest::GetAirdrop {
614 lamports: 2,
615 to: other,
616 blockhash,
617 };
618 let _tx1 = faucet.build_airdrop_transaction(request1, ip).unwrap(); let tx0 = faucet.build_airdrop_transaction(request, ip);
620 assert!(tx0.is_err());
621 let tx1 = faucet.build_airdrop_transaction(request1, ip);
622 assert!(tx1.is_err());
623
624 let mint = Keypair::new();
626 let mint_pubkey = mint.pubkey();
627 let mut faucet = Faucet::new(mint, None, None, Some(1));
628
629 if let FaucetTransaction::Memo((tx, memo)) =
630 faucet.build_airdrop_transaction(request, ip).unwrap()
631 {
632 let message = tx.message();
633
634 assert_eq!(tx.signatures.len(), 1);
635 assert_eq!(
636 message.account_keys,
637 vec![mint_pubkey, Pubkey::from(safe_memo::id().to_bytes())]
638 );
639 assert_eq!(message.recent_blockhash, blockhash);
640
641 assert_eq!(message.instructions.len(), 1);
642 let parsed_memo = std::str::from_utf8(&message.instructions[0].data).unwrap();
643 let expected_memo = "request too large; req: ◎0.000000002, cap: ◎0.000000001";
644 assert_eq!(parsed_memo, expected_memo);
645 assert_eq!(memo, expected_memo);
646 } else {
647 panic!("airdrop attempt should result in memo tx");
648 }
649 }
650
651 #[test]
652 fn test_process_faucet_request() {
653 let to = solana_sdk::pubkey::new_rand();
654 let blockhash = Hash::new(to.as_ref());
655 let lamports = 50;
656 let req = FaucetRequest::GetAirdrop {
657 lamports,
658 blockhash,
659 to,
660 };
661 let ip = socketaddr!([203, 0, 113, 1], 1234).ip();
662 let req = serialize(&req).unwrap();
663
664 let keypair = Keypair::new();
665 let expected_instruction = system_instruction::transfer(&keypair.pubkey(), &to, lamports);
666 let message = Message::new(&[expected_instruction], Some(&keypair.pubkey()));
667 let expected_tx = Transaction::new(&[&keypair], message, blockhash);
668 let expected_bytes = serialize(&expected_tx).unwrap();
669 let mut expected_vec_with_length = vec![0; 2];
670 LittleEndian::write_u16(&mut expected_vec_with_length, expected_bytes.len() as u16);
671 expected_vec_with_length.extend_from_slice(&expected_bytes);
672
673 let mut faucet = Faucet::new(keypair, None, None, None);
674 let response = faucet.process_faucet_request(&req, ip);
675 let response_vec = response.unwrap().to_vec();
676 assert_eq!(expected_vec_with_length, response_vec);
677
678 let bad_bytes = "bad bytes".as_bytes();
679 assert!(faucet.process_faucet_request(bad_bytes, ip).is_err());
680 }
681}