1use std::cmp::min;
4
5use anchor_lang::prelude::*;
6
7use crate::{error::StreamError, utils::get_current_timestamp};
8
9const ANCHOR_DISCRIMINATOR_LENGTH: usize = 8;
10
11const BOOL_LENGTH: usize = 1;
12const U8_LENGTH: usize = 1;
13const U64_LENGTH: usize = 8;
14const PUBLIC_KEY_LENGTH: usize = 32;
15const STRING_LENGTH_PREFIX: usize = 4;
16
17pub const MIN_STREAM_NAME_LENGTH: usize = 2;
19pub const MAX_STREAM_NAME_LENGTH: usize = 100;
21
22pub const DEPOSIT_AMOUNT_PERIOD_IN_SECS: u64 = 8 * 60 * 60; #[account]
41#[derive(Debug, PartialEq, Eq)]
42pub struct Stream {
43 pub is_prepaid: bool,
46
47 pub mint: Pubkey,
49 pub sender: Pubkey,
51 pub recipient: Pubkey,
53
54 pub created_at: u64,
56 pub starts_at: u64,
60 pub ends_at: u64,
65
66 pub initial_amount: u64,
68 pub flow_interval: u64,
70 pub flow_rate: u64,
72
73 pub is_cancelled: bool,
75 pub is_cancelled_before_start: bool,
79 pub is_cancelled_by_sender: bool,
83
84 pub cancelled_at: u64,
89
90 pub sender_can_cancel: bool,
92 pub sender_can_cancel_at: u64,
94
95 pub sender_can_change_sender: bool,
99 pub sender_can_change_sender_at: u64,
103
104 pub is_paused: bool,
108 pub is_paused_by_sender: bool,
113
114 pub sender_can_pause: bool,
118 pub sender_can_pause_at: u64,
122
123 pub recipient_can_resume_pause_by_sender: bool,
127 pub recipient_can_resume_pause_by_sender_at: u64,
131
132 pub anyone_can_withdraw_for_recipient: bool,
134 pub anyone_can_withdraw_for_recipient_at: u64,
136
137 pub last_resumed_at: u64,
142 pub accumulated_active_time: u64,
148
149 pub total_withdrawn_amount: u64,
154 pub last_withdrawn_at: u64,
156 pub last_withdrawn_amount: u64,
158
159 pub total_topup_amount: u64,
164 pub last_topup_at: u64,
166 pub last_topup_amount: u64,
168
169 pub deposit_needed: u64,
177
178 pub reserved: [u64; 16],
180
181 pub seed: u64,
184 pub bump: u8,
186
187 pub name: String,
191}
192
193impl Stream {
230 const BASE_LENGTH: usize = ANCHOR_DISCRIMINATOR_LENGTH
232 + 1 * BOOL_LENGTH + 3 * PUBLIC_KEY_LENGTH + 3 * U64_LENGTH + 3 * U64_LENGTH + 3 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 2 * BOOL_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 2 * U64_LENGTH + 3 * U64_LENGTH + 3 * U64_LENGTH + 1 * U64_LENGTH + 16 * U64_LENGTH + 1 * U64_LENGTH + 1 * U8_LENGTH ;
257
258 pub fn space(name: &str) -> usize {
259 Self::BASE_LENGTH + STRING_LENGTH_PREFIX + name.len()
260 }
261
262 pub fn has_flow_payments(&self) -> bool {
265 self.flow_rate > 0 && (self.ends_at == 0 || self.ends_at > self.starts_at)
266 }
267
268 pub fn get_prepaid_amount_needed(&self) -> Result<u64> {
270 if !self.is_prepaid || self.ends_at == 0 {
271 Ok(0)
272 } else if !self.has_flow_payments() {
273 Ok(self.initial_amount)
274 } else {
275 self.initial_amount
276 .checked_add(
277 ((self.ends_at - self.starts_at)
278 .checked_mul(self.flow_rate)
279 .ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))?)
280 / self.flow_interval,
281 )
282 .ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))
283 }
284 }
285
286 pub fn get_deposit_needed(&self) -> Result<u64> {
289 Ok(if self.is_prepaid || !self.has_flow_payments() {
290 0
291 } else {
292 let deposit_needed = if self.ends_at == 0 {
293 DEPOSIT_AMOUNT_PERIOD_IN_SECS
294 .checked_mul(self.flow_rate)
295 .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
296 / self.flow_interval
297 } else {
298 min(DEPOSIT_AMOUNT_PERIOD_IN_SECS, self.ends_at - self.starts_at)
299 .checked_mul(self.flow_rate)
300 .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
301 / self.flow_interval
302 };
303
304 if deposit_needed >= 10 {
305 deposit_needed
306 } else {
307 deposit_needed + 1
308 }
309 })
310 }
311
312 pub fn get_stops_at(&self) -> u64 {
313 let cancelled_at = self.cancelled_at;
314 let ends_at = self.ends_at;
315 if cancelled_at == 0 {
316 ends_at
317 } else if ends_at == 0 {
318 cancelled_at
319 } else {
320 min(ends_at, cancelled_at)
321 }
322 }
323
324 pub fn has_stopped(&self, at: u64) -> bool {
326 let stops_at = self.get_stops_at();
327 return stops_at > 0 && at > stops_at;
328 }
329
330 fn min_with_stopped_at(&self, at: u64) -> u64 {
331 let stops_at = self.get_stops_at();
332 if stops_at > 0 && at > stops_at {
333 stops_at
337 } else {
338 at
339 }
340 }
341
342 fn unsafe_get_active_time_after_start(&self, at: u64) -> Result<u64> {
344 Ok(if self.is_paused {
345 self.accumulated_active_time
347 } else if self.last_resumed_at == 0 {
348 at - self.starts_at
350 } else {
351 (at - self.last_resumed_at)
355 .checked_add(self.accumulated_active_time)
356 .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
357 })
358 }
359
360 pub fn get_max_acceptable_topup_amount(&self, at: u64) -> Result<(bool, u64)> {
362 Ok(if self.is_prepaid || !self.has_flow_payments() {
363 (false, 0)
364 } else {
365 let stops_at = self.get_stops_at();
367 if stops_at == 0 {
368 (true, 0)
370 } else if stops_at < self.starts_at {
371 (false, 0)
373 } else {
374 let total_possible_active_time = if at < self.starts_at {
377 self.ends_at - self.starts_at
380 } else {
381 if stops_at <= at {
384 self.unsafe_get_active_time_after_start(stops_at)?
386 } else {
387 self.unsafe_get_active_time_after_start(at)?
390 .checked_add(stops_at - at)
391 .ok_or(error!(StreamError::TopupAmountOutOfBounds))?
392 }
393 };
394
395 let total_possible_topup = if total_possible_active_time == 0 {
396 self.initial_amount
397 } else {
398 self.initial_amount
399 .checked_add(
400 (total_possible_active_time
401 .checked_mul(self.flow_rate)
402 .ok_or(error!(StreamError::TopupAmountOutOfBounds))?)
403 / self.flow_interval,
404 )
405 .ok_or(error!(StreamError::TopupAmountOutOfBounds))?
406 };
407
408 (
409 false,
410 if total_possible_topup <= self.total_topup_amount {
411 0
412 } else {
413 total_possible_topup - self.total_topup_amount
414 },
415 )
416 }
417 })
418 }
419
420 pub fn get_amount_owed(&self, at: u64) -> Result<u64> {
422 let at = self.min_with_stopped_at(at);
423
424 Ok(if at < self.starts_at {
425 0
426 } else if !self.has_flow_payments() {
427 self.initial_amount
428 } else {
429 let active_time = self.unsafe_get_active_time_after_start(at)?;
430 if active_time == 0 {
431 self.initial_amount
432 } else {
433 self.initial_amount
434 .checked_add(
435 (active_time
436 .checked_mul(self.flow_rate)
437 .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?)
438 / self.flow_interval,
439 )
440 .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
441 }
442 })
443 }
444
445 fn mark_cancelled(&mut self, at: u64, signer: &Signer) {
446 self.is_cancelled = true;
447 self.is_cancelled_before_start = at < self.starts_at;
448 self.is_cancelled_by_sender = signer.key() == self.sender;
449 self.cancelled_at = at;
450 }
451
452 fn add_topup_amount(&mut self, at: u64, latest_topup_amount: u64) -> Result<()> {
453 self.total_topup_amount = self
454 .total_topup_amount
455 .checked_add(latest_topup_amount)
456 .ok_or(error!(StreamError::TopupAmountOutOfBounds))?;
457 self.last_topup_at = at;
458 self.last_topup_amount = latest_topup_amount;
459 Ok(())
460 }
461
462 fn add_withdrawn_amount(&mut self, at: u64, latest_withdrawn_amount: u64) -> Result<()> {
463 if latest_withdrawn_amount == 0 {
464 return Ok(());
465 }
466
467 self.total_withdrawn_amount = self
468 .total_withdrawn_amount
469 .checked_add(latest_withdrawn_amount)
470 .ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?;
471 self.last_withdrawn_at = at;
472 self.last_withdrawn_amount = latest_withdrawn_amount;
473 Ok(())
474 }
475
476 pub fn initialize(
482 &mut self,
483 is_prepaid: bool,
484 mint: Pubkey,
485 sender: Pubkey,
486 recipient: Pubkey,
487 name: String,
488 starts_at: u64,
489 ends_at: u64,
490 initial_amount: u64,
491 flow_interval: u64,
492 flow_rate: u64,
493 sender_can_cancel: bool,
494 sender_can_cancel_at: u64,
495 sender_can_change_sender: bool,
496 sender_can_change_sender_at: u64,
497 sender_can_pause: bool,
498 sender_can_pause_at: u64,
499 recipient_can_resume_pause_by_sender: bool,
500 recipient_can_resume_pause_by_sender_at: u64,
501 anyone_can_withdraw_for_recipient: bool,
502 anyone_can_withdraw_for_recipient_at: u64,
503 seed: u64,
504 bump: u8,
505 ) -> Result<()> {
506 require!(recipient != Pubkey::default(), StreamError::EmptyRecipient);
507 require!(name.len() >= MIN_STREAM_NAME_LENGTH, StreamError::StreamNameTooShort);
508 require!(name.len() <= MAX_STREAM_NAME_LENGTH, StreamError::StreamNameTooLong);
509 require!(recipient != sender, StreamError::SameSenderAndRecipient);
510 require!(flow_interval > 0, StreamError::ZeroFlowInterval);
511
512 let at = get_current_timestamp()?;
513 let starts_at = if starts_at < at { at } else { starts_at };
514
515 require!(
516 (!is_prepaid && ends_at == 0) || ends_at >= starts_at,
517 StreamError::InvalidEndsAt,
518 );
519
520 let sender_can_cancel_at = if sender_can_cancel {
521 min(sender_can_cancel_at, at)
522 } else {
523 0
524 };
525 let sender_can_change_sender_at = if sender_can_change_sender {
526 min(sender_can_change_sender_at, at)
527 } else {
528 0
529 };
530 let sender_can_pause_at = if sender_can_pause {
531 min(sender_can_pause_at, at)
532 } else {
533 0
534 };
535 let recipient_can_resume_pause_by_sender_at = if recipient_can_resume_pause_by_sender {
536 min(recipient_can_resume_pause_by_sender_at, at)
537 } else {
538 0
539 };
540 let anyone_can_withdraw_for_recipient_at = if anyone_can_withdraw_for_recipient {
541 min(anyone_can_withdraw_for_recipient_at, at)
542 } else {
543 0
544 };
545
546 self.is_prepaid = is_prepaid;
547 self.is_cancelled = false;
548 self.is_cancelled_before_start = false;
549 self.is_cancelled_by_sender = false;
550 self.is_paused = false;
551 self.is_paused_by_sender = false;
552 self.mint = mint;
553 self.sender = sender;
554 self.recipient = recipient;
555 self.created_at = at;
556 self.starts_at = starts_at;
557 self.ends_at = ends_at;
558 self.initial_amount = initial_amount;
559 self.flow_interval = flow_interval;
560 self.flow_rate = flow_rate;
561 self.sender_can_cancel = sender_can_cancel;
562 self.sender_can_cancel_at = sender_can_cancel_at;
563 self.cancelled_at = 0;
564 self.sender_can_change_sender = sender_can_change_sender;
565 self.sender_can_change_sender_at = sender_can_change_sender_at;
566 self.sender_can_pause = sender_can_pause;
567 self.sender_can_pause_at = sender_can_pause_at;
568 self.recipient_can_resume_pause_by_sender = recipient_can_resume_pause_by_sender;
569 self.recipient_can_resume_pause_by_sender_at = recipient_can_resume_pause_by_sender_at;
570 self.anyone_can_withdraw_for_recipient = anyone_can_withdraw_for_recipient;
571 self.anyone_can_withdraw_for_recipient_at = anyone_can_withdraw_for_recipient_at;
572 self.last_resumed_at = 0;
573 self.accumulated_active_time = 0;
574 self.total_withdrawn_amount = 0;
575 self.last_withdrawn_at = 0;
576 self.last_withdrawn_amount = 0;
577 self.total_topup_amount = 0;
578 self.last_topup_at = 0;
579 self.last_topup_amount = 0;
580 self.deposit_needed = self.get_deposit_needed()?;
581 self.seed = seed;
582 self.bump = bump;
583 self.name = name;
584
585 require!(
586 self.initial_amount > 0 || self.has_flow_payments(),
587 StreamError::ZeroLifetimeAmount
588 );
589 Ok(())
590 }
591
592 pub fn initialize_prepaid(&mut self) -> Result<u64> {
594 let prepaid_amount_needed = self.get_prepaid_amount_needed()?;
595 require!(prepaid_amount_needed > 0, StreamError::ZeroLifetimeAmount);
596 self.add_topup_amount(get_current_timestamp()?, prepaid_amount_needed)?;
597 Ok(prepaid_amount_needed)
598 }
599
600 pub fn initialize_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
602 require!(topup_amount > 0, StreamError::ZeroAmount);
603
604 let amount_needed = self
609 .initial_amount
610 .checked_add(
611 self.deposit_needed
612 .checked_mul(2)
613 .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?,
614 )
615 .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?;
616 require!(topup_amount >= amount_needed, StreamError::AmountLessThanAmountNeeded);
617 self.add_topup_amount(get_current_timestamp()?, topup_amount - self.deposit_needed)
618 }
619
620 pub(crate) fn cancel(&mut self, key: Pubkey, signer: &Signer, recipient: Pubkey) -> Result<CancelTransferParams> {
621 require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
622 require!(recipient == self.recipient, StreamError::InvalidRecipient);
623
624 let at = get_current_timestamp()?;
625 self.mark_cancelled(at, signer);
626
627 let total_topup_amount = self.total_topup_amount;
628 let amount_owed = self.get_amount_owed(at)?;
629 if total_topup_amount < amount_owed {
630 let transfer_amount_recipient = if total_topup_amount > self.total_withdrawn_amount {
632 total_topup_amount - self.total_withdrawn_amount
633 } else {
634 0
635 };
636 self.add_withdrawn_amount(at, transfer_amount_recipient)?;
637
638 if self.is_prepaid {
639 msg!("Prepaid stream [{}] is insolvent. THIS SHOULD NEVER HAPPEN!!!", key);
640 Ok(CancelTransferParams {
641 transfer_amount_sender: 0,
642 transfer_amount_signer: 0,
643 transfer_amount_recipient,
644 })
645 } else {
646 let transfer_amount_signer = self.deposit_needed;
649 self.deposit_needed = 0;
650 Ok(CancelTransferParams {
651 transfer_amount_sender: 0,
652 transfer_amount_signer,
653 transfer_amount_recipient,
654 })
655 }
656 } else {
657 let signer_key = signer.key();
659 require!(
660 signer_key == self.sender || signer_key == self.recipient,
661 StreamError::UserUnauthorizedToCancel,
662 );
663 require!(
664 signer_key != self.sender || (self.sender_can_cancel && self.sender_can_cancel_at <= at),
665 StreamError::SenderCannotCancel,
666 );
667
668 let transfer_amount_sender = total_topup_amount
671 .checked_add(self.deposit_needed)
672 .ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
673 - amount_owed;
674
675 self.total_topup_amount = amount_owed;
676 self.deposit_needed = 0;
677
678 let transfer_amount_recipient = if amount_owed > self.total_withdrawn_amount {
679 amount_owed - self.total_withdrawn_amount
680 } else {
681 0
682 };
683 self.add_withdrawn_amount(at, transfer_amount_recipient)?;
684
685 Ok(CancelTransferParams {
686 transfer_amount_sender,
687 transfer_amount_signer: 0,
688 transfer_amount_recipient,
689 })
690 }
691 }
692
693 pub(crate) fn withdraw_excess_topup_non_prepaid_ended(&mut self) -> Result<u64> {
694 require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
695
696 let at = get_current_timestamp()?;
697 require!(self.ends_at > 0 && self.ends_at < at, StreamError::StreamNotEnded);
698
699 let total_topup_amount = self.total_topup_amount;
700 let amount_owed = self.get_amount_owed(at)?;
701 Ok(if total_topup_amount < amount_owed {
702 0
704 } else {
705 let deposit_needed = self.deposit_needed;
706
707 self.total_topup_amount = amount_owed;
708 self.deposit_needed = 0;
709
710 total_topup_amount
711 .checked_add(deposit_needed)
712 .ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
713 - amount_owed
714 })
715 }
716
717 pub(crate) fn topup_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
718 require!(topup_amount > 0, StreamError::ZeroAmount);
719 require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
720 require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
721
722 let at = get_current_timestamp()?;
723 require!(!self.has_stopped(at), StreamError::StreamHasStopped);
724
725 let (no_limit, max_acceptable_topup) = self.get_max_acceptable_topup_amount(at)?;
726 if !no_limit && topup_amount > max_acceptable_topup {
727 require!(!self.has_stopped(at), StreamError::TopupAmountMoreThanMaxAcceptable);
728 }
729
730 self.add_topup_amount(get_current_timestamp()?, topup_amount)
731 }
732
733 pub(crate) fn change_sender_non_prepaid(&mut self, sender: &Signer, new_sender: Pubkey) -> Result<()> {
734 require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
735 require!(sender.key() == self.sender, StreamError::InvalidSender);
736 require!(new_sender != Pubkey::default(), StreamError::InvalidNewSender);
737 require!(new_sender != self.sender, StreamError::SameSenders);
738
739 let at = get_current_timestamp()?;
740 require!(
741 self.sender_can_change_sender && self.sender_can_change_sender_at <= at,
742 StreamError::SenderCannotChangeSender
743 );
744 require!(!self.has_stopped(at), StreamError::StreamHasStopped);
745
746 self.sender = new_sender;
747 Ok(())
748 }
749
750 pub(crate) fn withdraw_and_change_recipient(
751 &mut self,
752 signer: &Signer,
753 recipient: Pubkey,
754 new_recipient: Pubkey,
755 ) -> Result<u64> {
756 require!(recipient == self.recipient, StreamError::InvalidRecipient);
757
758 let at = get_current_timestamp()?;
759 require!(
760 signer.key() == self.recipient
761 || (self.anyone_can_withdraw_for_recipient && self.anyone_can_withdraw_for_recipient_at <= at),
762 StreamError::UserUnauthorizedToWithdraw,
763 );
764
765 let total_topup_amount = self.total_topup_amount;
766
767 let at = get_current_timestamp()?;
768 let mut amount_owed = self.get_amount_owed(at)?;
769 if amount_owed > total_topup_amount {
770 amount_owed = if self.is_cancelled {
773 total_topup_amount
774 } else {
775 self.mark_cancelled(at, signer);
776 total_topup_amount
777 .checked_add(self.deposit_needed)
778 .ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?
779 }
780 }
781
782 require!(
783 amount_owed >= self.total_withdrawn_amount,
784 StreamError::WithdrawnAmountGreaterThanAmountOwed,
785 );
786 let amount_available_to_withdraw = amount_owed - self.total_withdrawn_amount;
787 self.add_withdrawn_amount(at, amount_available_to_withdraw)?;
788 if !self.is_cancelled && new_recipient != Pubkey::default() {
789 require!(signer.key() == self.recipient, StreamError::UserUnauthorizedToWithdraw);
791 require!(new_recipient != self.recipient, StreamError::SameRecipients);
792 self.recipient = new_recipient;
793 }
794
795 Ok(amount_available_to_withdraw)
796 }
797
798 pub(crate) fn pause_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
799 require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
800 require!(!self.is_paused, StreamError::StreamIsPaused);
801 require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
802
803 let signer_key = signer.key();
804 let is_sender = signer_key == self.sender;
805 let is_recipient = signer_key == self.recipient;
806 require!(is_sender || is_recipient, StreamError::UserUnauthorizedToPause);
807
808 let at = get_current_timestamp()?;
809 require!(
810 is_recipient || (self.sender_can_pause && self.sender_can_pause_at <= at),
811 StreamError::SenderCannotPause
812 );
813
814 require!(!self.has_stopped(at), StreamError::StreamHasStopped);
815
816 if at > self.starts_at {
818 self.accumulated_active_time = self.unsafe_get_active_time_after_start(at)?;
819 }
820
821 self.is_paused = true;
822 self.is_paused_by_sender = is_sender;
823
824 Ok(())
825 }
826
827 pub(crate) fn resume_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
828 require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
829 require!(self.is_paused, StreamError::StreamIsNotPaused);
830
831 let signer_key = signer.key();
832 let is_sender = signer_key == self.sender;
833 let is_recipient = signer_key == self.recipient;
834 require!(is_sender || is_recipient, StreamError::UserUnauthorizedToResume);
835
836 let at = get_current_timestamp()?;
837 require!(
838 is_sender
839 || !self.is_paused_by_sender
840 || (self.recipient_can_resume_pause_by_sender && self.recipient_can_resume_pause_by_sender_at <= at),
841 StreamError::RecipientCannotResumePauseBySender
842 );
843
844 require!(!self.has_stopped(at), StreamError::StreamHasStopped);
845
846 self.is_paused = false;
847 self.is_paused_by_sender = false;
848
849 if at > self.starts_at {
851 self.last_resumed_at = at;
852 }
853
854 Ok(())
855 }
856
857 }
859
860pub struct CancelTransferParams {
862 pub transfer_amount_sender: u64,
864 pub transfer_amount_signer: u64,
866 pub transfer_amount_recipient: u64,
868}