use std::cmp::min;
use anchor_lang::prelude::*;
use crate::{error::StreamError, utils::get_current_timestamp};
const ANCHOR_DISCRIMINATOR_LENGTH: usize = 8;
const BOOL_LENGTH: usize = 1;
const U8_LENGTH: usize = 1;
const U64_LENGTH: usize = 8;
const PUBLIC_KEY_LENGTH: usize = 32;
const STRING_LENGTH_PREFIX: usize = 4;
pub const MIN_STREAM_NAME_LENGTH: usize = 2;
pub const MAX_STREAM_NAME_LENGTH: usize = 100;
pub const DEPOSIT_AMOUNT_PERIOD_IN_SECS: u64 = 8 * 60 * 60;
#[account]
#[derive(Debug, PartialEq, Eq)]
pub struct Stream {
pub is_prepaid: bool,
pub mint: Pubkey,
pub sender: Pubkey,
pub recipient: Pubkey,
pub created_at: u64,
pub starts_at: u64,
pub ends_at: u64,
pub initial_amount: u64,
pub flow_interval: u64,
pub flow_rate: u64,
pub is_cancelled: bool,
pub is_cancelled_before_start: bool,
pub is_cancelled_by_sender: bool,
pub cancelled_at: u64,
pub sender_can_cancel: bool,
pub sender_can_cancel_at: u64,
pub sender_can_change_sender: bool,
pub sender_can_change_sender_at: u64,
pub is_paused: bool,
pub is_paused_by_sender: bool,
pub sender_can_pause: bool,
pub sender_can_pause_at: u64,
pub recipient_can_resume_pause_by_sender: bool,
pub recipient_can_resume_pause_by_sender_at: u64,
pub anyone_can_withdraw_for_recipient: bool,
pub anyone_can_withdraw_for_recipient_at: u64,
pub last_resumed_at: u64,
pub accumulated_active_time: u64,
pub total_withdrawn_amount: u64,
pub last_withdrawn_at: u64,
pub last_withdrawn_amount: u64,
pub total_topup_amount: u64,
pub last_topup_at: u64,
pub last_topup_amount: u64,
pub deposit_needed: u64,
pub reserved: [u64; 16],
pub seed: u64,
pub bump: u8,
pub name: String,
}
impl Stream {
const BASE_LENGTH: usize = ANCHOR_DISCRIMINATOR_LENGTH
+ 1 * BOOL_LENGTH + 3 * PUBLIC_KEY_LENGTH + 3 * U64_LENGTH + 3 * U64_LENGTH + 3 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 2 * BOOL_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 1 * BOOL_LENGTH + 1 * U64_LENGTH + 2 * U64_LENGTH + 3 * U64_LENGTH + 3 * U64_LENGTH + 1 * U64_LENGTH + 16 * U64_LENGTH + 1 * U64_LENGTH + 1 * U8_LENGTH ;
pub fn space(name: &str) -> usize {
Self::BASE_LENGTH + STRING_LENGTH_PREFIX + name.len()
}
pub fn has_flow_payments(&self) -> bool {
self.flow_rate > 0 && (self.ends_at == 0 || self.ends_at > self.starts_at)
}
pub fn get_prepaid_amount_needed(&self) -> Result<u64> {
if !self.is_prepaid || self.ends_at == 0 {
Ok(0)
} else if !self.has_flow_payments() {
Ok(self.initial_amount)
} else {
self.initial_amount
.checked_add(
((self.ends_at - self.starts_at)
.checked_mul(self.flow_rate)
.ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))?)
/ self.flow_interval,
)
.ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))
}
}
pub fn get_deposit_needed(&self) -> Result<u64> {
Ok(if self.is_prepaid || !self.has_flow_payments() {
0
} else {
let deposit_needed = if self.ends_at == 0 {
DEPOSIT_AMOUNT_PERIOD_IN_SECS
.checked_mul(self.flow_rate)
.ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
/ self.flow_interval
} else {
min(DEPOSIT_AMOUNT_PERIOD_IN_SECS, self.ends_at - self.starts_at)
.checked_mul(self.flow_rate)
.ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
/ self.flow_interval
};
if deposit_needed >= 10 {
deposit_needed
} else {
deposit_needed + 1
}
})
}
pub fn get_stops_at(&self) -> u64 {
let cancelled_at = self.cancelled_at;
let ends_at = self.ends_at;
if cancelled_at == 0 {
ends_at
} else if ends_at == 0 {
cancelled_at
} else {
min(ends_at, cancelled_at)
}
}
pub fn has_stopped(&self, at: u64) -> bool {
let stops_at = self.get_stops_at();
return stops_at > 0 && at > stops_at;
}
fn min_with_stopped_at(&self, at: u64) -> u64 {
let stops_at = self.get_stops_at();
if stops_at > 0 && at > stops_at {
stops_at
} else {
at
}
}
fn unsafe_get_active_time_after_start(&self, at: u64) -> Result<u64> {
Ok(if self.is_paused {
self.accumulated_active_time
} else if self.last_resumed_at == 0 {
at - self.starts_at
} else {
(at - self.last_resumed_at)
.checked_add(self.accumulated_active_time)
.ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
})
}
pub fn get_max_acceptable_topup_amount(&self, at: u64) -> Result<(bool, u64)> {
Ok(if self.is_prepaid || !self.has_flow_payments() {
(false, 0)
} else {
let stops_at = self.get_stops_at();
if stops_at == 0 {
(true, 0)
} else if stops_at < self.starts_at {
(false, 0)
} else {
let total_possible_active_time = if at < self.starts_at {
self.ends_at - self.starts_at
} else {
if stops_at <= at {
self.unsafe_get_active_time_after_start(stops_at)?
} else {
self.unsafe_get_active_time_after_start(at)?
.checked_add(stops_at - at)
.ok_or(error!(StreamError::TopupAmountOutOfBounds))?
}
};
let total_possible_topup = if total_possible_active_time == 0 {
self.initial_amount
} else {
self.initial_amount
.checked_add(
(total_possible_active_time
.checked_mul(self.flow_rate)
.ok_or(error!(StreamError::TopupAmountOutOfBounds))?)
/ self.flow_interval,
)
.ok_or(error!(StreamError::TopupAmountOutOfBounds))?
};
(
false,
if total_possible_topup <= self.total_topup_amount {
0
} else {
total_possible_topup - self.total_topup_amount
},
)
}
})
}
pub fn get_amount_owed(&self, at: u64) -> Result<u64> {
let at = self.min_with_stopped_at(at);
Ok(if at < self.starts_at {
0
} else if !self.has_flow_payments() {
self.initial_amount
} else {
let active_time = self.unsafe_get_active_time_after_start(at)?;
if active_time == 0 {
self.initial_amount
} else {
self.initial_amount
.checked_add(
(active_time
.checked_mul(self.flow_rate)
.ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?)
/ self.flow_interval,
)
.ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
}
})
}
fn mark_cancelled(&mut self, at: u64, signer: &Signer) {
self.is_cancelled = true;
self.is_cancelled_before_start = at < self.starts_at;
self.is_cancelled_by_sender = signer.key() == self.sender;
self.cancelled_at = at;
}
fn add_topup_amount(&mut self, at: u64, latest_topup_amount: u64) -> Result<()> {
self.total_topup_amount = self
.total_topup_amount
.checked_add(latest_topup_amount)
.ok_or(error!(StreamError::TopupAmountOutOfBounds))?;
self.last_topup_at = at;
self.last_topup_amount = latest_topup_amount;
Ok(())
}
fn add_withdrawn_amount(&mut self, at: u64, latest_withdrawn_amount: u64) -> Result<()> {
if latest_withdrawn_amount == 0 {
return Ok(());
}
self.total_withdrawn_amount = self
.total_withdrawn_amount
.checked_add(latest_withdrawn_amount)
.ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?;
self.last_withdrawn_at = at;
self.last_withdrawn_amount = latest_withdrawn_amount;
Ok(())
}
pub fn initialize(
&mut self,
is_prepaid: bool,
mint: Pubkey,
sender: Pubkey,
recipient: Pubkey,
name: String,
starts_at: u64,
ends_at: u64,
initial_amount: u64,
flow_interval: u64,
flow_rate: u64,
sender_can_cancel: bool,
sender_can_cancel_at: u64,
sender_can_change_sender: bool,
sender_can_change_sender_at: u64,
sender_can_pause: bool,
sender_can_pause_at: u64,
recipient_can_resume_pause_by_sender: bool,
recipient_can_resume_pause_by_sender_at: u64,
anyone_can_withdraw_for_recipient: bool,
anyone_can_withdraw_for_recipient_at: u64,
seed: u64,
bump: u8,
) -> Result<()> {
require!(recipient != Pubkey::default(), StreamError::EmptyRecipient);
require!(name.len() >= MIN_STREAM_NAME_LENGTH, StreamError::StreamNameTooShort);
require!(name.len() <= MAX_STREAM_NAME_LENGTH, StreamError::StreamNameTooLong);
require!(recipient != sender, StreamError::SameSenderAndRecipient);
require!(flow_interval > 0, StreamError::ZeroFlowInterval);
let at = get_current_timestamp()?;
let starts_at = if starts_at < at { at } else { starts_at };
require!(
(!is_prepaid && ends_at == 0) || ends_at >= starts_at,
StreamError::InvalidEndsAt,
);
let sender_can_cancel_at = if sender_can_cancel {
min(sender_can_cancel_at, at)
} else {
0
};
let sender_can_change_sender_at = if sender_can_change_sender {
min(sender_can_change_sender_at, at)
} else {
0
};
let sender_can_pause_at = if sender_can_pause {
min(sender_can_pause_at, at)
} else {
0
};
let recipient_can_resume_pause_by_sender_at = if recipient_can_resume_pause_by_sender {
min(recipient_can_resume_pause_by_sender_at, at)
} else {
0
};
let anyone_can_withdraw_for_recipient_at = if anyone_can_withdraw_for_recipient {
min(anyone_can_withdraw_for_recipient_at, at)
} else {
0
};
self.is_prepaid = is_prepaid;
self.is_cancelled = false;
self.is_cancelled_before_start = false;
self.is_cancelled_by_sender = false;
self.is_paused = false;
self.is_paused_by_sender = false;
self.mint = mint;
self.sender = sender;
self.recipient = recipient;
self.created_at = at;
self.starts_at = starts_at;
self.ends_at = ends_at;
self.initial_amount = initial_amount;
self.flow_interval = flow_interval;
self.flow_rate = flow_rate;
self.sender_can_cancel = sender_can_cancel;
self.sender_can_cancel_at = sender_can_cancel_at;
self.cancelled_at = 0;
self.sender_can_change_sender = sender_can_change_sender;
self.sender_can_change_sender_at = sender_can_change_sender_at;
self.sender_can_pause = sender_can_pause;
self.sender_can_pause_at = sender_can_pause_at;
self.recipient_can_resume_pause_by_sender = recipient_can_resume_pause_by_sender;
self.recipient_can_resume_pause_by_sender_at = recipient_can_resume_pause_by_sender_at;
self.anyone_can_withdraw_for_recipient = anyone_can_withdraw_for_recipient;
self.anyone_can_withdraw_for_recipient_at = anyone_can_withdraw_for_recipient_at;
self.last_resumed_at = 0;
self.accumulated_active_time = 0;
self.total_withdrawn_amount = 0;
self.last_withdrawn_at = 0;
self.last_withdrawn_amount = 0;
self.total_topup_amount = 0;
self.last_topup_at = 0;
self.last_topup_amount = 0;
self.deposit_needed = self.get_deposit_needed()?;
self.seed = seed;
self.bump = bump;
self.name = name;
require!(
self.initial_amount > 0 || self.has_flow_payments(),
StreamError::ZeroLifetimeAmount
);
Ok(())
}
pub fn initialize_prepaid(&mut self) -> Result<u64> {
let prepaid_amount_needed = self.get_prepaid_amount_needed()?;
require!(prepaid_amount_needed > 0, StreamError::ZeroLifetimeAmount);
self.add_topup_amount(get_current_timestamp()?, prepaid_amount_needed)?;
Ok(prepaid_amount_needed)
}
pub fn initialize_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
require!(topup_amount > 0, StreamError::ZeroAmount);
let amount_needed = self
.initial_amount
.checked_add(
self.deposit_needed
.checked_mul(2)
.ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?,
)
.ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?;
require!(topup_amount >= amount_needed, StreamError::AmountLessThanAmountNeeded);
self.add_topup_amount(get_current_timestamp()?, topup_amount - self.deposit_needed)
}
pub(crate) fn cancel(&mut self, key: Pubkey, signer: &Signer, recipient: Pubkey) -> Result<CancelTransferParams> {
require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
require!(recipient == self.recipient, StreamError::InvalidRecipient);
let at = get_current_timestamp()?;
self.mark_cancelled(at, signer);
let total_topup_amount = self.total_topup_amount;
let amount_owed = self.get_amount_owed(at)?;
if total_topup_amount < amount_owed {
let transfer_amount_recipient = if total_topup_amount > self.total_withdrawn_amount {
total_topup_amount - self.total_withdrawn_amount
} else {
0
};
self.add_withdrawn_amount(at, transfer_amount_recipient)?;
if self.is_prepaid {
msg!("Prepaid stream [{}] is insolvent. THIS SHOULD NEVER HAPPEN!!!", key);
Ok(CancelTransferParams {
transfer_amount_sender: 0,
transfer_amount_signer: 0,
transfer_amount_recipient,
})
} else {
let transfer_amount_signer = self.deposit_needed;
self.deposit_needed = 0;
Ok(CancelTransferParams {
transfer_amount_sender: 0,
transfer_amount_signer,
transfer_amount_recipient,
})
}
} else {
let signer_key = signer.key();
require!(
signer_key == self.sender || signer_key == self.recipient,
StreamError::UserUnauthorizedToCancel,
);
require!(
signer_key != self.sender || (self.sender_can_cancel && self.sender_can_cancel_at <= at),
StreamError::SenderCannotCancel,
);
let transfer_amount_sender = total_topup_amount
.checked_add(self.deposit_needed)
.ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
- amount_owed;
self.total_topup_amount = amount_owed;
self.deposit_needed = 0;
let transfer_amount_recipient = if amount_owed > self.total_withdrawn_amount {
amount_owed - self.total_withdrawn_amount
} else {
0
};
self.add_withdrawn_amount(at, transfer_amount_recipient)?;
Ok(CancelTransferParams {
transfer_amount_sender,
transfer_amount_signer: 0,
transfer_amount_recipient,
})
}
}
pub(crate) fn withdraw_excess_topup_non_prepaid_ended(&mut self) -> Result<u64> {
require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
let at = get_current_timestamp()?;
require!(self.ends_at > 0 && self.ends_at < at, StreamError::StreamNotEnded);
let total_topup_amount = self.total_topup_amount;
let amount_owed = self.get_amount_owed(at)?;
Ok(if total_topup_amount < amount_owed {
0
} else {
let deposit_needed = self.deposit_needed;
self.total_topup_amount = amount_owed;
self.deposit_needed = 0;
total_topup_amount
.checked_add(deposit_needed)
.ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
- amount_owed
})
}
pub(crate) fn topup_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
require!(topup_amount > 0, StreamError::ZeroAmount);
require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
let at = get_current_timestamp()?;
require!(!self.has_stopped(at), StreamError::StreamHasStopped);
let (no_limit, max_acceptable_topup) = self.get_max_acceptable_topup_amount(at)?;
if !no_limit && topup_amount > max_acceptable_topup {
require!(!self.has_stopped(at), StreamError::TopupAmountMoreThanMaxAcceptable);
}
self.add_topup_amount(get_current_timestamp()?, topup_amount)
}
pub(crate) fn change_sender_non_prepaid(&mut self, sender: &Signer, new_sender: Pubkey) -> Result<()> {
require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
require!(sender.key() == self.sender, StreamError::InvalidSender);
require!(new_sender != Pubkey::default(), StreamError::InvalidNewSender);
require!(new_sender != self.sender, StreamError::SameSenders);
let at = get_current_timestamp()?;
require!(
self.sender_can_change_sender && self.sender_can_change_sender_at <= at,
StreamError::SenderCannotChangeSender
);
require!(!self.has_stopped(at), StreamError::StreamHasStopped);
self.sender = new_sender;
Ok(())
}
pub(crate) fn withdraw_and_change_recipient(
&mut self,
signer: &Signer,
recipient: Pubkey,
new_recipient: Pubkey,
) -> Result<u64> {
require!(recipient == self.recipient, StreamError::InvalidRecipient);
let at = get_current_timestamp()?;
require!(
signer.key() == self.recipient
|| (self.anyone_can_withdraw_for_recipient && self.anyone_can_withdraw_for_recipient_at <= at),
StreamError::UserUnauthorizedToWithdraw,
);
let total_topup_amount = self.total_topup_amount;
let at = get_current_timestamp()?;
let mut amount_owed = self.get_amount_owed(at)?;
if amount_owed > total_topup_amount {
amount_owed = if self.is_cancelled {
total_topup_amount
} else {
self.mark_cancelled(at, signer);
total_topup_amount
.checked_add(self.deposit_needed)
.ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?
}
}
require!(
amount_owed >= self.total_withdrawn_amount,
StreamError::WithdrawnAmountGreaterThanAmountOwed,
);
let amount_available_to_withdraw = amount_owed - self.total_withdrawn_amount;
self.add_withdrawn_amount(at, amount_available_to_withdraw)?;
if !self.is_cancelled && new_recipient != Pubkey::default() {
require!(signer.key() == self.recipient, StreamError::UserUnauthorizedToWithdraw);
require!(new_recipient != self.recipient, StreamError::SameRecipients);
self.recipient = new_recipient;
}
Ok(amount_available_to_withdraw)
}
pub(crate) fn pause_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
require!(!self.is_paused, StreamError::StreamIsPaused);
require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
let signer_key = signer.key();
let is_sender = signer_key == self.sender;
let is_recipient = signer_key == self.recipient;
require!(is_sender || is_recipient, StreamError::UserUnauthorizedToPause);
let at = get_current_timestamp()?;
require!(
is_recipient || (self.sender_can_pause && self.sender_can_pause_at <= at),
StreamError::SenderCannotPause
);
require!(!self.has_stopped(at), StreamError::StreamHasStopped);
if at > self.starts_at {
self.accumulated_active_time = self.unsafe_get_active_time_after_start(at)?;
}
self.is_paused = true;
self.is_paused_by_sender = is_sender;
Ok(())
}
pub(crate) fn resume_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
require!(self.is_paused, StreamError::StreamIsNotPaused);
let signer_key = signer.key();
let is_sender = signer_key == self.sender;
let is_recipient = signer_key == self.recipient;
require!(is_sender || is_recipient, StreamError::UserUnauthorizedToResume);
let at = get_current_timestamp()?;
require!(
is_sender
|| !self.is_paused_by_sender
|| (self.recipient_can_resume_pause_by_sender && self.recipient_can_resume_pause_by_sender_at <= at),
StreamError::RecipientCannotResumePauseBySender
);
require!(!self.has_stopped(at), StreamError::StreamHasStopped);
self.is_paused = false;
self.is_paused_by_sender = false;
if at > self.starts_at {
self.last_resumed_at = at;
}
Ok(())
}
}
pub struct CancelTransferParams {
pub transfer_amount_sender: u64,
pub transfer_amount_signer: u64,
pub transfer_amount_recipient: u64,
}