superstream/
state.rs

1//! Module for superstream state management.
2
3use std::cmp::min;
4
5use anchor_lang::prelude::*;
6
7use crate::{error::StreamError, utils::get_current_timestamp};
8
9const ANCHOR_DISCRIMINATOR_LENGTH: usize = 8;
10
11const BOOL_LENGTH: usize = 1;
12const U8_LENGTH: usize = 1;
13const U64_LENGTH: usize = 8;
14const PUBLIC_KEY_LENGTH: usize = 32;
15const STRING_LENGTH_PREFIX: usize = 4;
16
17/// Minimum length of a stream name.
18pub const MIN_STREAM_NAME_LENGTH: usize = 2;
19/// Maximum length of a stream name.
20pub const MAX_STREAM_NAME_LENGTH: usize = 100;
21
22/// Deposit amount period (in seconds) for a non-prepaid stream. If a non-prepaid stream has unlimited lifetime or
23/// lifetime >= DEPOSIT_AMOUNT_PERIOD_IN_SECS, a security deposit is taken from the sender which would not be returned
24/// in case the stream becomes insolvent. This is done to make sure users keep topping up their streams on time.
25pub const DEPOSIT_AMOUNT_PERIOD_IN_SECS: u64 = 8 * 60 * 60; // 8 hrs
26
27/// A payment stream with support for SPL tokens, prepaid and limited upfront payment, unlimited lifetime, cliffs and
28/// cancellations.
29///
30/// Possible states of a stream:
31/// - Not started
32///     - Scheduled
33///     - Cancelled before start
34/// - Started but not stopped
35///     - Streaming
36///     - Paused
37/// - Stopped
38///     - Cancelled after start
39///     - Ended
40#[account]
41#[derive(Debug, PartialEq, Eq)]
42pub struct Stream {
43    /// If true, the stream is prepaid - all the required amount needs to be deposited on creation. Prepaid streams
44    /// cannot have unlimited lifetime.
45    pub is_prepaid: bool,
46
47    /// SPL token mint address.
48    pub mint: Pubkey,
49    /// Sender address.
50    pub sender: Pubkey,
51    /// Recipient address.
52    pub recipient: Pubkey,
53
54    /// Time at which the stream was created.
55    pub created_at: u64,
56    /// Start time of the stream.
57    ///
58    /// INVARIANT: >= created_at
59    pub starts_at: u64,
60    /// End time of the stream. If the stream is unbounded, this can be 0 to indicate no end time.
61    ///
62    /// INVARIANT: prepaid: >= starts_at
63    /// INVARIANT: unbounded: == 0 || >= starts_at
64    pub ends_at: u64,
65
66    /// Amount available to the recipient once stream starts.
67    pub initial_amount: u64,
68    /// Flow interval is the interval in which flow payments are released.
69    pub flow_interval: u64,
70    /// Flow rate is the number of tokens to stream per interval.
71    pub flow_rate: u64,
72
73    /// If true, the stream has been cancelled.
74    pub is_cancelled: bool,
75    /// If true, the stream has been cancelled before start.
76    ///
77    /// INVARIANT: !is_cancelled => == false
78    pub is_cancelled_before_start: bool,
79    /// If true, the stream has been cancelled by the sender.
80    ///
81    /// INVARIANT: !is_cancelled || !sender_can_cancel => == false
82    pub is_cancelled_by_sender: bool,
83
84    /// Time at which the stream was cancelled. If it is > 0, it means the stream has been cancelled and any funds in
85    /// the escrow account not available to be withdrawn by the recipient have been retrieved.
86    ///
87    /// INVARIANT: cancelled_at > 0 iff is_cancelled == true
88    pub cancelled_at: u64,
89
90    /// True if a solvent stream can be cancelled by the sender.
91    pub sender_can_cancel: bool,
92    /// Time at which the sender is allowed to cancel a solvent stream.
93    pub sender_can_cancel_at: u64,
94
95    /// True if the sender can change the sender of the stream who will do the upcoming topups.
96    ///
97    /// INVARIANT: prepaid: false
98    pub sender_can_change_sender: bool,
99    /// Time at which the sender is allowed to change the sender.
100    ///
101    /// INVARIANT: prepaid: == 0
102    pub sender_can_change_sender_at: u64,
103
104    /// If true, the stream is paused.
105    ///
106    /// INVARIANT: prepaid: == false
107    pub is_paused: bool,
108    /// If true, the stream is paused by sender.
109    ///
110    /// INVARIANT: prepaid: == false
111    /// INVARIANT: runtime: unbounded: !is_paused || !sender_can_pause => == false
112    pub is_paused_by_sender: bool,
113
114    /// True if a stream can be paused by the sender.
115    ///
116    /// INVARIANT: prepaid: false
117    pub sender_can_pause: bool,
118    /// Time at which the sender is allowed to pause a stream.
119    ///
120    /// INVARIANT: prepaid: == 0
121    pub sender_can_pause_at: u64,
122
123    /// True if a stream can be resumed by the recipient if it was paused by the sender.
124    ///
125    /// INVARIANT: prepaid: false
126    pub recipient_can_resume_pause_by_sender: bool,
127    /// Time at which the recipient is allowed to resume a stream which was paused by the sender.
128    ///
129    /// INVARIANT: prepaid: == 0
130    pub recipient_can_resume_pause_by_sender_at: u64,
131
132    /// True if anyone can withdraw on behalf of the recipient. The amount will go in recipients' account.
133    pub anyone_can_withdraw_for_recipient: bool,
134    /// Time at which anyone can withdraw on behalf of the recipient.
135    pub anyone_can_withdraw_for_recipient_at: u64,
136
137    /// Time at which the stream was last resumed.
138    ///
139    /// INVARIANT: prepaid: == 0
140    /// INVARIANT: unbounded: (== 0 || >= starts_at) && (ends_at == 0 || < ends_at)
141    pub last_resumed_at: u64,
142    /// Total accumulated active (!is_paused) time since starts_at. This does not include (current_time -
143    /// last_resumed_at) time if the stream is not paused.
144    ///
145    /// INVARIANT: prepaid: == 0
146    /// INVARIANT: unbounded: == 0 || (current_time > starts_at && == current_time - starts_at - total_paused_time)
147    pub accumulated_active_time: u64,
148
149    /// Total amount withdrawn by the recipient.
150    ///
151    /// INVARIANT: runtime: prepaid: <= amount_owed && <= prepaid_amount_needed
152    /// INVARIANT: runtime: unbounded: <= amount_owed && <= total_topup_amount
153    pub total_withdrawn_amount: u64,
154    /// Last time at which recipient withdrew any amount.
155    pub last_withdrawn_at: u64,
156    /// Last amount which recipient withdrew.
157    pub last_withdrawn_amount: u64,
158
159    /// Total topup amount added for the stream.
160    ///
161    /// INVARIANT: prepaid: == total_prepaid_amount
162    /// INVARIANT: unbounded: >= initial_amount + streaming_amount_owed
163    pub total_topup_amount: u64,
164    /// Last time at which sender topped up the stream.
165    pub last_topup_at: u64,
166    /// Last topup amount.
167    pub last_topup_amount: u64,
168
169    /// Total deposit amount needed for the non-prepaid stream. These are needed in case the sender does not topup the
170    /// stream in time and the amount owed becomes > total topup amount. When that happens, anyone can cancel the
171    /// stream. The deposit amount will be distributed as a reward to whoever finds the insolvency and cancels the
172    /// stream.
173    ///
174    /// INVARIANT: prepaid: == 0
175    /// INVARIANT: unbounded: == DEPOSIT_AMOUNT_PERIOD_IN_SECS of streaming payments
176    pub deposit_needed: u64,
177
178    /// Extra space for program upgrades.
179    pub reserved: [u64; 16],
180
181    /// Seed of the stream PDA. It's upto the client how they choose the seed. Each tuple (seed, mint, name) corresponds
182    /// to a unique stream.
183    pub seed: u64,
184    /// The PDA bump.
185    pub bump: u8,
186
187    /// Name of the stream. Should be unique for a particular set of (seed, mint).
188    ///
189    /// INVARIANT: Length <= 100 unicode chars or 400 bytes
190    pub name: String,
191}
192
193/// INVARIANT: total_topup_amount_sent_by_sender ==
194///                amount_owed_to_recipient + amount_owed_to_cancelling_account
195///                + excess_topup_amount_available_to_withdraw_to_sender_after_ended
196///
197/// For solvent stream:
198///
199/// INVARIANT: total_topup_amount_sent_by_sender ==
200///                total_topup_amount + deposit_needed
201/// INVARIANT: amount_owed_to_recipient ==
202///                amount_owed
203/// INVARIANT: amount_owed_to_cancelling_account ==
204///                0
205/// INVARIANT: excess_topup_amount_available_to_withdraw_to_sender_after_ended ==
206///                total_topup_amount + deposit_needed - amount_owed
207///
208/// For non-cancelled insolvent stream:
209///
210/// INVARIANT: total_topup_amount_sent_by_sender ==
211///                total_topup_amount + deposit_needed
212/// INVARIANT: amount_owed_to_recipient ==
213///                total_topup_amount + deposit_needed
214/// INVARIANT: amount_owed_to_cancelling_account ==
215///                0
216/// INVARIANT: excess_topup_amount_available_to_withdraw_to_sender_after_ended ==
217///                0
218///
219/// For cancelled insolvent stream: (deposit_needed == 0 for cancelled streams)
220///
221/// INVARIANT: total_topup_amount_sent_by_sender ==
222///                total_topup_amount + insolvent_stream_cancellation_reward (== initial_deposit_needed)
223/// INVARIANT: amount_owed_to_recipient ==
224///                total_topup_amount
225/// INVARIANT: amount_owed_to_cancelling_account ==
226///                insolvent_stream_cancellation_reward (== initial_deposit_needed)
227/// INVARIANT: excess_topup_amount_available_to_withdraw_to_sender_after_ended ==
228///                0
229impl Stream {
230    /// Total size of a Stream account excluding space taken up by the name
231    const BASE_LENGTH: usize = ANCHOR_DISCRIMINATOR_LENGTH
232        + 1 * BOOL_LENGTH       // is_prepaid - 9
233        + 3 * PUBLIC_KEY_LENGTH // sender, recipient, mint - 105
234        + 3 * U64_LENGTH        // created_at, starts_at, ends_at - 129
235        + 3 * U64_LENGTH        // initial_amount, flow_interval, flow_rate - 153
236        + 3 * BOOL_LENGTH       // is_cancelled, is_cancelled_before_start, is_cancelled_by_sender - 156
237        + 1 * U64_LENGTH        // cancelled_at - 164
238        + 1 * BOOL_LENGTH       // sender_can_cancel - 165
239        + 1 * U64_LENGTH        // sender_can_cancel_at - 173
240        + 1 * BOOL_LENGTH       // sender_can_change_sender - 174
241        + 1 * U64_LENGTH        // sender_can_change_sender_at - 182
242        + 2 * BOOL_LENGTH       // is_paused, is_paused_by_sender - 184
243        + 1 * BOOL_LENGTH       // sender_can_pause - 185
244        + 1 * U64_LENGTH        // sender_can_pause_at - 193
245        + 1 * BOOL_LENGTH       // recipient_can_resume_pause_by_sender - 194
246        + 1 * U64_LENGTH        // recipient_can_resume_pause_by_sender_at - 202
247        + 1 * BOOL_LENGTH       // anyone_can_withdraw_for_recipient - 203
248        + 1 * U64_LENGTH        // anyone_can_withdraw_for_recipient_at - 211
249        + 2 * U64_LENGTH        // last_resumed_at, accumulated_active_time - 227
250        + 3 * U64_LENGTH        // total_withdrawn_amount, last_withdrawn_at, last_withdrawn_amount - 251
251        + 3 * U64_LENGTH        // total_topup_amount, last_topup_at, last_topup_amount - 275
252        + 1 * U64_LENGTH        // deposit_needed - 283
253        + 16 * U64_LENGTH       // reserved - 411
254        + 1 * U64_LENGTH        // seed - 419
255        + 1 * U8_LENGTH         // bump - 420
256    ;
257
258    pub fn space(name: &str) -> usize {
259        Self::BASE_LENGTH + STRING_LENGTH_PREFIX + name.len()
260    }
261
262    // --- Utility functions --- BEGIN ---
263
264    pub fn has_flow_payments(&self) -> bool {
265        self.flow_rate > 0 && (self.ends_at == 0 || self.ends_at > self.starts_at)
266    }
267
268    /// Calculate the amount of prepaid needed for a prepaid stream. This is called when creating the stream.
269    pub fn get_prepaid_amount_needed(&self) -> Result<u64> {
270        if !self.is_prepaid || self.ends_at == 0 {
271            Ok(0)
272        } else if !self.has_flow_payments() {
273            Ok(self.initial_amount)
274        } else {
275            self.initial_amount
276                .checked_add(
277                    ((self.ends_at - self.starts_at)
278                        .checked_mul(self.flow_rate)
279                        .ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))?)
280                        / self.flow_interval,
281                )
282                .ok_or(error!(StreamError::PrepaidAmountNeededOutOfBounds))
283        }
284    }
285
286    /// Calculate the amount of deposit needed for the streaming payments excluding the initial amount. This is called
287    /// when creating the stream.
288    pub fn get_deposit_needed(&self) -> Result<u64> {
289        Ok(if self.is_prepaid || !self.has_flow_payments() {
290            0
291        } else {
292            let deposit_needed = if self.ends_at == 0 {
293                DEPOSIT_AMOUNT_PERIOD_IN_SECS
294                    .checked_mul(self.flow_rate)
295                    .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
296                    / self.flow_interval
297            } else {
298                min(DEPOSIT_AMOUNT_PERIOD_IN_SECS, self.ends_at - self.starts_at)
299                    .checked_mul(self.flow_rate)
300                    .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?
301                    / self.flow_interval
302            };
303
304            if deposit_needed >= 10 {
305                deposit_needed
306            } else {
307                deposit_needed + 1
308            }
309        })
310    }
311
312    pub fn get_stops_at(&self) -> u64 {
313        let cancelled_at = self.cancelled_at;
314        let ends_at = self.ends_at;
315        if cancelled_at == 0 {
316            ends_at
317        } else if ends_at == 0 {
318            cancelled_at
319        } else {
320            min(ends_at, cancelled_at)
321        }
322    }
323
324    /// Check if the stream has stooped.
325    pub fn has_stopped(&self, at: u64) -> bool {
326        let stops_at = self.get_stops_at();
327        return stops_at > 0 && at > stops_at;
328    }
329
330    fn min_with_stopped_at(&self, at: u64) -> u64 {
331        let stops_at = self.get_stops_at();
332        if stops_at > 0 && at > stops_at {
333            // If the stream has been stopped for some reason - either ending or being cancelled - make at = stopped_at
334            // if the stream stopped before at. This will make sure, the amount is calculated only till the time the
335            // stream was active.
336            stops_at
337        } else {
338            at
339        }
340    }
341
342    // INVARIANT: (stops_at == 0 || at <= stops_at) && at >= self.starts_at && self.has_flow_payments()
343    fn unsafe_get_active_time_after_start(&self, at: u64) -> Result<u64> {
344        Ok(if self.is_paused {
345            // INVARIANT: The stream is paused => accumulated time is the total time.
346            self.accumulated_active_time
347        } else if self.last_resumed_at == 0 {
348            // INVARIANT: The stream is not paused and was never resumed => stream was never paused.
349            at - self.starts_at
350        } else {
351            // SAFETY: INVARIANT: last_resumed_at != 0 =>
352            //     last_resumed_at >= starts_at && (ends_at == 0 || < ends_at) =>
353            //     last_resumed_at will never be > ends_at if ends_at > 0
354            (at - self.last_resumed_at)
355                .checked_add(self.accumulated_active_time)
356                .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
357        })
358    }
359
360    /// Get the maximum acceptable topup amount.
361    pub fn get_max_acceptable_topup_amount(&self, at: u64) -> Result<(bool, u64)> {
362        Ok(if self.is_prepaid || !self.has_flow_payments() {
363            (false, 0)
364        } else {
365            // Streams: non-prepaid, with flow payments.
366            let stops_at = self.get_stops_at();
367            if stops_at == 0 {
368                // Streams: non-prepaid, with flow payments, non-cancelled with no end time.
369                (true, 0)
370            } else if stops_at < self.starts_at {
371                // Streams: non-prepaid, with flow payments, stopped before start.
372                (false, 0)
373            } else {
374                // Streams: non-prepaid, with flow payments, cancelled after start time and/or with set end time after
375                // start time.
376                let total_possible_active_time = if at < self.starts_at {
377                    // Streams: non-prepaid, with flow payments, not-started, with set end time after start time. The
378                    // stream cannot be cancelled before current time.
379                    self.ends_at - self.starts_at
380                } else {
381                    // Streams: non-prepaid, with flow payments, started, cancelled after start time and/or with set end
382                    // time after start time.
383                    if stops_at <= at {
384                        // Streams: non-prepaid, with flow payments, started, stopped after start time.
385                        self.unsafe_get_active_time_after_start(stops_at)?
386                    } else {
387                        // Streams: non-prepaid, with flow payments, started, not already stopped => with set end time
388                        // after start time in the future.
389                        self.unsafe_get_active_time_after_start(at)?
390                            .checked_add(stops_at - at)
391                            .ok_or(error!(StreamError::TopupAmountOutOfBounds))?
392                    }
393                };
394
395                let total_possible_topup = if total_possible_active_time == 0 {
396                    self.initial_amount
397                } else {
398                    self.initial_amount
399                        .checked_add(
400                            (total_possible_active_time
401                                .checked_mul(self.flow_rate)
402                                .ok_or(error!(StreamError::TopupAmountOutOfBounds))?)
403                                / self.flow_interval,
404                        )
405                        .ok_or(error!(StreamError::TopupAmountOutOfBounds))?
406                };
407
408                (
409                    false,
410                    if total_possible_topup <= self.total_topup_amount {
411                        0
412                    } else {
413                        total_possible_topup - self.total_topup_amount
414                    },
415                )
416            }
417        })
418    }
419
420    /// Get the total amount owed to the recipient.
421    pub fn get_amount_owed(&self, at: u64) -> Result<u64> {
422        let at = self.min_with_stopped_at(at);
423
424        Ok(if at < self.starts_at {
425            0
426        } else if !self.has_flow_payments() {
427            self.initial_amount
428        } else {
429            let active_time = self.unsafe_get_active_time_after_start(at)?;
430            if active_time == 0 {
431                self.initial_amount
432            } else {
433                self.initial_amount
434                    .checked_add(
435                        (active_time
436                            .checked_mul(self.flow_rate)
437                            .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?)
438                            / self.flow_interval,
439                    )
440                    .ok_or(error!(StreamError::AmountAvailableToWithdrawOutOfBounds))?
441            }
442        })
443    }
444
445    fn mark_cancelled(&mut self, at: u64, signer: &Signer) {
446        self.is_cancelled = true;
447        self.is_cancelled_before_start = at < self.starts_at;
448        self.is_cancelled_by_sender = signer.key() == self.sender;
449        self.cancelled_at = at;
450    }
451
452    fn add_topup_amount(&mut self, at: u64, latest_topup_amount: u64) -> Result<()> {
453        self.total_topup_amount = self
454            .total_topup_amount
455            .checked_add(latest_topup_amount)
456            .ok_or(error!(StreamError::TopupAmountOutOfBounds))?;
457        self.last_topup_at = at;
458        self.last_topup_amount = latest_topup_amount;
459        Ok(())
460    }
461
462    fn add_withdrawn_amount(&mut self, at: u64, latest_withdrawn_amount: u64) -> Result<()> {
463        if latest_withdrawn_amount == 0 {
464            return Ok(());
465        }
466
467        self.total_withdrawn_amount = self
468            .total_withdrawn_amount
469            .checked_add(latest_withdrawn_amount)
470            .ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?;
471        self.last_withdrawn_at = at;
472        self.last_withdrawn_amount = latest_withdrawn_amount;
473        Ok(())
474    }
475
476    // --- Utility functions --- END ---
477
478    // --- Instruction functions --- BEGIN ---
479
480    /// Initialize a stream.
481    pub fn initialize(
482        &mut self,
483        is_prepaid: bool,
484        mint: Pubkey,
485        sender: Pubkey,
486        recipient: Pubkey,
487        name: String,
488        starts_at: u64,
489        ends_at: u64,
490        initial_amount: u64,
491        flow_interval: u64,
492        flow_rate: u64,
493        sender_can_cancel: bool,
494        sender_can_cancel_at: u64,
495        sender_can_change_sender: bool,
496        sender_can_change_sender_at: u64,
497        sender_can_pause: bool,
498        sender_can_pause_at: u64,
499        recipient_can_resume_pause_by_sender: bool,
500        recipient_can_resume_pause_by_sender_at: u64,
501        anyone_can_withdraw_for_recipient: bool,
502        anyone_can_withdraw_for_recipient_at: u64,
503        seed: u64,
504        bump: u8,
505    ) -> Result<()> {
506        require!(recipient != Pubkey::default(), StreamError::EmptyRecipient);
507        require!(name.len() >= MIN_STREAM_NAME_LENGTH, StreamError::StreamNameTooShort);
508        require!(name.len() <= MAX_STREAM_NAME_LENGTH, StreamError::StreamNameTooLong);
509        require!(recipient != sender, StreamError::SameSenderAndRecipient);
510        require!(flow_interval > 0, StreamError::ZeroFlowInterval);
511
512        let at = get_current_timestamp()?;
513        let starts_at = if starts_at < at { at } else { starts_at };
514
515        require!(
516            (!is_prepaid && ends_at == 0) || ends_at >= starts_at,
517            StreamError::InvalidEndsAt,
518        );
519
520        let sender_can_cancel_at = if sender_can_cancel {
521            min(sender_can_cancel_at, at)
522        } else {
523            0
524        };
525        let sender_can_change_sender_at = if sender_can_change_sender {
526            min(sender_can_change_sender_at, at)
527        } else {
528            0
529        };
530        let sender_can_pause_at = if sender_can_pause {
531            min(sender_can_pause_at, at)
532        } else {
533            0
534        };
535        let recipient_can_resume_pause_by_sender_at = if recipient_can_resume_pause_by_sender {
536            min(recipient_can_resume_pause_by_sender_at, at)
537        } else {
538            0
539        };
540        let anyone_can_withdraw_for_recipient_at = if anyone_can_withdraw_for_recipient {
541            min(anyone_can_withdraw_for_recipient_at, at)
542        } else {
543            0
544        };
545
546        self.is_prepaid = is_prepaid;
547        self.is_cancelled = false;
548        self.is_cancelled_before_start = false;
549        self.is_cancelled_by_sender = false;
550        self.is_paused = false;
551        self.is_paused_by_sender = false;
552        self.mint = mint;
553        self.sender = sender;
554        self.recipient = recipient;
555        self.created_at = at;
556        self.starts_at = starts_at;
557        self.ends_at = ends_at;
558        self.initial_amount = initial_amount;
559        self.flow_interval = flow_interval;
560        self.flow_rate = flow_rate;
561        self.sender_can_cancel = sender_can_cancel;
562        self.sender_can_cancel_at = sender_can_cancel_at;
563        self.cancelled_at = 0;
564        self.sender_can_change_sender = sender_can_change_sender;
565        self.sender_can_change_sender_at = sender_can_change_sender_at;
566        self.sender_can_pause = sender_can_pause;
567        self.sender_can_pause_at = sender_can_pause_at;
568        self.recipient_can_resume_pause_by_sender = recipient_can_resume_pause_by_sender;
569        self.recipient_can_resume_pause_by_sender_at = recipient_can_resume_pause_by_sender_at;
570        self.anyone_can_withdraw_for_recipient = anyone_can_withdraw_for_recipient;
571        self.anyone_can_withdraw_for_recipient_at = anyone_can_withdraw_for_recipient_at;
572        self.last_resumed_at = 0;
573        self.accumulated_active_time = 0;
574        self.total_withdrawn_amount = 0;
575        self.last_withdrawn_at = 0;
576        self.last_withdrawn_amount = 0;
577        self.total_topup_amount = 0;
578        self.last_topup_at = 0;
579        self.last_topup_amount = 0;
580        self.deposit_needed = self.get_deposit_needed()?;
581        self.seed = seed;
582        self.bump = bump;
583        self.name = name;
584
585        require!(
586            self.initial_amount > 0 || self.has_flow_payments(),
587            StreamError::ZeroLifetimeAmount
588        );
589        Ok(())
590    }
591
592    /// Initialize a prepaid stream.
593    pub fn initialize_prepaid(&mut self) -> Result<u64> {
594        let prepaid_amount_needed = self.get_prepaid_amount_needed()?;
595        require!(prepaid_amount_needed > 0, StreamError::ZeroLifetimeAmount);
596        self.add_topup_amount(get_current_timestamp()?, prepaid_amount_needed)?;
597        Ok(prepaid_amount_needed)
598    }
599
600    /// Initialize a non-prepaid stream.
601    pub fn initialize_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
602        require!(topup_amount > 0, StreamError::ZeroAmount);
603
604        // Amount needed = initial_amount + 2 * deposit_amount.
605        //
606        // We are doing 2 times deposit amount, because if it was just once, the stream would become insolvent
607        // immediately.
608        let amount_needed = self
609            .initial_amount
610            .checked_add(
611                self.deposit_needed
612                    .checked_mul(2)
613                    .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?,
614            )
615            .ok_or(error!(StreamError::DepositAmountNeededOutOfBounds))?;
616        require!(topup_amount >= amount_needed, StreamError::AmountLessThanAmountNeeded);
617        self.add_topup_amount(get_current_timestamp()?, topup_amount - self.deposit_needed)
618    }
619
620    pub(crate) fn cancel(&mut self, key: Pubkey, signer: &Signer, recipient: Pubkey) -> Result<CancelTransferParams> {
621        require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
622        require!(recipient == self.recipient, StreamError::InvalidRecipient);
623
624        let at = get_current_timestamp()?;
625        self.mark_cancelled(at, signer);
626
627        let total_topup_amount = self.total_topup_amount;
628        let amount_owed = self.get_amount_owed(at)?;
629        if total_topup_amount < amount_owed {
630            // The stream is insolvent. Anyone can cancel.
631            let transfer_amount_recipient = if total_topup_amount > self.total_withdrawn_amount {
632                total_topup_amount - self.total_withdrawn_amount
633            } else {
634                0
635            };
636            self.add_withdrawn_amount(at, transfer_amount_recipient)?;
637
638            if self.is_prepaid {
639                msg!("Prepaid stream [{}] is insolvent. THIS SHOULD NEVER HAPPEN!!!", key);
640                Ok(CancelTransferParams {
641                    transfer_amount_sender: 0,
642                    transfer_amount_signer: 0,
643                    transfer_amount_recipient,
644                })
645            } else {
646                // The deposit is given as reward to the signer. The remaining topup amount can be withdrawn by the
647                // recipient.
648                let transfer_amount_signer = self.deposit_needed;
649                self.deposit_needed = 0;
650                Ok(CancelTransferParams {
651                    transfer_amount_sender: 0,
652                    transfer_amount_signer,
653                    transfer_amount_recipient,
654                })
655            }
656        } else {
657            // The stream is still solvent. Only the sender and recipient can cancel.
658            let signer_key = signer.key();
659            require!(
660                signer_key == self.sender || signer_key == self.recipient,
661                StreamError::UserUnauthorizedToCancel,
662            );
663            require!(
664                signer_key != self.sender || (self.sender_can_cancel && self.sender_can_cancel_at <= at),
665                StreamError::SenderCannotCancel,
666            );
667
668            // Return anything the sender paid - topup or deposit that is not owed to the recipient. The stream has been
669            // cancelled and stopped, so the deposit is no longer needed.
670            let transfer_amount_sender = total_topup_amount
671                .checked_add(self.deposit_needed)
672                .ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
673                - amount_owed;
674
675            self.total_topup_amount = amount_owed;
676            self.deposit_needed = 0;
677
678            let transfer_amount_recipient = if amount_owed > self.total_withdrawn_amount {
679                amount_owed - self.total_withdrawn_amount
680            } else {
681                0
682            };
683            self.add_withdrawn_amount(at, transfer_amount_recipient)?;
684
685            Ok(CancelTransferParams {
686                transfer_amount_sender,
687                transfer_amount_signer: 0,
688                transfer_amount_recipient,
689            })
690        }
691    }
692
693    pub(crate) fn withdraw_excess_topup_non_prepaid_ended(&mut self) -> Result<u64> {
694        require!(!self.is_cancelled, StreamError::StreamAlreadyCancelled);
695
696        let at = get_current_timestamp()?;
697        require!(self.ends_at > 0 && self.ends_at < at, StreamError::StreamNotEnded);
698
699        let total_topup_amount = self.total_topup_amount;
700        let amount_owed = self.get_amount_owed(at)?;
701        Ok(if total_topup_amount < amount_owed {
702            // The stream is insolvent. Nothing to do.
703            0
704        } else {
705            let deposit_needed = self.deposit_needed;
706
707            self.total_topup_amount = amount_owed;
708            self.deposit_needed = 0;
709
710            total_topup_amount
711                .checked_add(deposit_needed)
712                .ok_or(error!(StreamError::CancellationRefundOutOfBounds))?
713                - amount_owed
714        })
715    }
716
717    pub(crate) fn topup_non_prepaid(&mut self, topup_amount: u64) -> Result<()> {
718        require!(topup_amount > 0, StreamError::ZeroAmount);
719        require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
720        require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
721
722        let at = get_current_timestamp()?;
723        require!(!self.has_stopped(at), StreamError::StreamHasStopped);
724
725        let (no_limit, max_acceptable_topup) = self.get_max_acceptable_topup_amount(at)?;
726        if !no_limit && topup_amount > max_acceptable_topup {
727            require!(!self.has_stopped(at), StreamError::TopupAmountMoreThanMaxAcceptable);
728        }
729
730        self.add_topup_amount(get_current_timestamp()?, topup_amount)
731    }
732
733    pub(crate) fn change_sender_non_prepaid(&mut self, sender: &Signer, new_sender: Pubkey) -> Result<()> {
734        require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
735        require!(sender.key() == self.sender, StreamError::InvalidSender);
736        require!(new_sender != Pubkey::default(), StreamError::InvalidNewSender);
737        require!(new_sender != self.sender, StreamError::SameSenders);
738
739        let at = get_current_timestamp()?;
740        require!(
741            self.sender_can_change_sender && self.sender_can_change_sender_at <= at,
742            StreamError::SenderCannotChangeSender
743        );
744        require!(!self.has_stopped(at), StreamError::StreamHasStopped);
745
746        self.sender = new_sender;
747        Ok(())
748    }
749
750    pub(crate) fn withdraw_and_change_recipient(
751        &mut self,
752        signer: &Signer,
753        recipient: Pubkey,
754        new_recipient: Pubkey,
755    ) -> Result<u64> {
756        require!(recipient == self.recipient, StreamError::InvalidRecipient);
757
758        let at = get_current_timestamp()?;
759        require!(
760            signer.key() == self.recipient
761                || (self.anyone_can_withdraw_for_recipient && self.anyone_can_withdraw_for_recipient_at <= at),
762            StreamError::UserUnauthorizedToWithdraw,
763        );
764
765        let total_topup_amount = self.total_topup_amount;
766
767        let at = get_current_timestamp()?;
768        let mut amount_owed = self.get_amount_owed(at)?;
769        if amount_owed > total_topup_amount {
770            // The stream is insolvent. Cancel the stream if not already cancelled. Recipient is owed the whole topup
771            // amount and if the stream is not cancelled yet, also the deposit amount.
772            amount_owed = if self.is_cancelled {
773                total_topup_amount
774            } else {
775                self.mark_cancelled(at, signer);
776                total_topup_amount
777                    .checked_add(self.deposit_needed)
778                    .ok_or(error!(StreamError::WithdrawAmountOutOfBounds))?
779            }
780        }
781
782        require!(
783            amount_owed >= self.total_withdrawn_amount,
784            StreamError::WithdrawnAmountGreaterThanAmountOwed,
785        );
786        let amount_available_to_withdraw = amount_owed - self.total_withdrawn_amount;
787        self.add_withdrawn_amount(at, amount_available_to_withdraw)?;
788        if !self.is_cancelled && new_recipient != Pubkey::default() {
789            // Only the recipient can change the recipient.
790            require!(signer.key() == self.recipient, StreamError::UserUnauthorizedToWithdraw);
791            require!(new_recipient != self.recipient, StreamError::SameRecipients);
792            self.recipient = new_recipient;
793        }
794
795        Ok(amount_available_to_withdraw)
796    }
797
798    pub(crate) fn pause_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
799        require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
800        require!(!self.is_paused, StreamError::StreamIsPaused);
801        require!(self.has_flow_payments(), StreamError::StreamHasNoFlowPayments);
802
803        let signer_key = signer.key();
804        let is_sender = signer_key == self.sender;
805        let is_recipient = signer_key == self.recipient;
806        require!(is_sender || is_recipient, StreamError::UserUnauthorizedToPause);
807
808        let at = get_current_timestamp()?;
809        require!(
810            is_recipient || (self.sender_can_pause && self.sender_can_pause_at <= at),
811            StreamError::SenderCannotPause
812        );
813
814        require!(!self.has_stopped(at), StreamError::StreamHasStopped);
815
816        // Update accumulated_active_time if there has been any flow till `at`.
817        if at > self.starts_at {
818            self.accumulated_active_time = self.unsafe_get_active_time_after_start(at)?;
819        }
820
821        self.is_paused = true;
822        self.is_paused_by_sender = is_sender;
823
824        Ok(())
825    }
826
827    pub(crate) fn resume_non_prepaid(&mut self, signer: &Signer) -> Result<()> {
828        require!(!self.is_prepaid, StreamError::StreamIsPrepaid);
829        require!(self.is_paused, StreamError::StreamIsNotPaused);
830
831        let signer_key = signer.key();
832        let is_sender = signer_key == self.sender;
833        let is_recipient = signer_key == self.recipient;
834        require!(is_sender || is_recipient, StreamError::UserUnauthorizedToResume);
835
836        let at = get_current_timestamp()?;
837        require!(
838            is_sender
839                || !self.is_paused_by_sender
840                || (self.recipient_can_resume_pause_by_sender && self.recipient_can_resume_pause_by_sender_at <= at),
841            StreamError::RecipientCannotResumePauseBySender
842        );
843
844        require!(!self.has_stopped(at), StreamError::StreamHasStopped);
845
846        self.is_paused = false;
847        self.is_paused_by_sender = false;
848
849        // Update last_resumed_at if there has been any flow till `at`.
850        if at > self.starts_at {
851            self.last_resumed_at = at;
852        }
853
854        Ok(())
855    }
856
857    // --- Instruction functions --- END ---
858}
859
860/// Record of funds to be transferred once a stream is cancelled.
861pub struct CancelTransferParams {
862    /// Transfer fund amount to the stream sender.
863    pub transfer_amount_sender: u64,
864    /// Transfer fund amount to the signer.
865    pub transfer_amount_signer: u64,
866    /// Transfer fund amount to the stream recipient.
867    pub transfer_amount_recipient: u64,
868}