1use crate::{Client, Error, Keypair, Pubkey, Result};
2use serde::{Deserialize, Serialize};
3
4const PROGRAM_SYMBOL_CANDIDATES: [&str; 2] = ["SPOREPAY", "sporepay"];
5const STREAM_SIZE: usize = 105;
6const STREAM_INFO_SIZE: usize = 113;
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct SporePayStream {
10 pub stream_id: u64,
11 pub sender: String,
12 pub recipient: String,
13 pub total_amount: u64,
14 pub withdrawn_amount: u64,
15 pub start_slot: u64,
16 pub end_slot: u64,
17 pub cancelled: bool,
18 pub created_slot: u64,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SporePayStreamInfo {
23 #[serde(flatten)]
24 pub stream: SporePayStream,
25 pub cliff_slot: u64,
26}
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct SporePayStats {
30 pub stream_count: u64,
31 pub total_streamed: u64,
32 pub total_withdrawn: u64,
33 pub cancel_count: u64,
34 pub paused: bool,
35}
36
37#[derive(Debug, Clone)]
38pub struct CreateStreamParams {
39 pub recipient: Pubkey,
40 pub total_amount: u64,
41 pub start_slot: u64,
42 pub end_slot: u64,
43}
44
45#[derive(Debug, Clone)]
46pub struct CreateStreamWithCliffParams {
47 pub recipient: Pubkey,
48 pub total_amount: u64,
49 pub start_slot: u64,
50 pub end_slot: u64,
51 pub cliff_slot: u64,
52}
53
54#[derive(Debug, Clone)]
55pub struct WithdrawFromStreamParams {
56 pub stream_id: u64,
57 pub amount: u64,
58}
59
60#[derive(Debug, Clone)]
61pub struct TransferStreamParams {
62 pub stream_id: u64,
63 pub new_recipient: Pubkey,
64}
65
66#[derive(Debug, Clone)]
67pub struct SporePayClient {
68 client: Client,
69 program_id: std::sync::Arc<std::sync::Mutex<Option<Pubkey>>>,
70}
71
72fn build_layout_args(layout: &[u8], chunks: &[Vec<u8>]) -> Vec<u8> {
73 let mut out = Vec::with_capacity(
74 1 + layout.len() + chunks.iter().map(|chunk| chunk.len()).sum::<usize>(),
75 );
76 out.push(0xAB);
77 out.extend_from_slice(layout);
78 for chunk in chunks {
79 out.extend_from_slice(chunk);
80 }
81 out
82}
83
84fn encode_create_stream_args(sender: &Pubkey, params: &CreateStreamParams) -> Vec<u8> {
85 build_layout_args(
86 &[0x20, 0x20, 0x08, 0x08, 0x08],
87 &[
88 sender.as_ref().to_vec(),
89 params.recipient.as_ref().to_vec(),
90 params.total_amount.to_le_bytes().to_vec(),
91 params.start_slot.to_le_bytes().to_vec(),
92 params.end_slot.to_le_bytes().to_vec(),
93 ],
94 )
95}
96
97fn encode_create_stream_with_cliff_args(
98 sender: &Pubkey,
99 params: &CreateStreamWithCliffParams,
100) -> Vec<u8> {
101 build_layout_args(
102 &[0x20, 0x20, 0x08, 0x08, 0x08, 0x08],
103 &[
104 sender.as_ref().to_vec(),
105 params.recipient.as_ref().to_vec(),
106 params.total_amount.to_le_bytes().to_vec(),
107 params.start_slot.to_le_bytes().to_vec(),
108 params.end_slot.to_le_bytes().to_vec(),
109 params.cliff_slot.to_le_bytes().to_vec(),
110 ],
111 )
112}
113
114fn encode_withdraw_args(caller: &Pubkey, params: &WithdrawFromStreamParams) -> Vec<u8> {
115 build_layout_args(
116 &[0x20, 0x08, 0x08],
117 &[
118 caller.as_ref().to_vec(),
119 params.stream_id.to_le_bytes().to_vec(),
120 params.amount.to_le_bytes().to_vec(),
121 ],
122 )
123}
124
125fn encode_cancel_args(caller: &Pubkey, stream_id: u64) -> Vec<u8> {
126 build_layout_args(
127 &[0x20, 0x08],
128 &[caller.as_ref().to_vec(), stream_id.to_le_bytes().to_vec()],
129 )
130}
131
132fn encode_transfer_args(caller: &Pubkey, params: &TransferStreamParams) -> Vec<u8> {
133 build_layout_args(
134 &[0x20, 0x20, 0x08],
135 &[
136 caller.as_ref().to_vec(),
137 params.new_recipient.as_ref().to_vec(),
138 params.stream_id.to_le_bytes().to_vec(),
139 ],
140 )
141}
142
143fn encode_stream_lookup_args(stream_id: u64) -> Vec<u8> {
144 build_layout_args(&[0x08], &[stream_id.to_le_bytes().to_vec()])
145}
146
147fn ensure_readonly_success(
148 result: &crate::client::ReadonlyContractResult,
149 function_name: &str,
150) -> Result<()> {
151 let code = result.return_code.unwrap_or(0);
152 if code != 0 {
153 return Err(Error::RpcError(result.error.clone().unwrap_or_else(|| {
154 format!("SporePay {} returned code {}", function_name, code)
155 })));
156 }
157 if !result.success {
158 return Err(Error::RpcError(
159 result
160 .error
161 .clone()
162 .unwrap_or_else(|| format!("SporePay {} failed", function_name)),
163 ));
164 }
165 Ok(())
166}
167
168fn decode_return_data(
169 result: &crate::client::ReadonlyContractResult,
170 function_name: &str,
171) -> Result<Vec<u8>> {
172 let Some(return_data) = &result.return_data else {
173 return Err(Error::ParseError(format!(
174 "SporePay {} did not return payload data",
175 function_name,
176 )));
177 };
178
179 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, return_data)
180 .map_err(|err| Error::ParseError(err.to_string()))
181}
182
183fn decode_stream(stream_id: u64, bytes: &[u8]) -> Result<SporePayStream> {
184 if bytes.len() < STREAM_SIZE {
185 return Err(Error::ParseError(
186 "SporePay stream payload was shorter than expected".into(),
187 ));
188 }
189
190 Ok(SporePayStream {
191 stream_id,
192 sender: Pubkey(bytes[0..32].try_into().unwrap()).to_base58(),
193 recipient: Pubkey(bytes[32..64].try_into().unwrap()).to_base58(),
194 total_amount: u64::from_le_bytes(bytes[64..72].try_into().unwrap()),
195 withdrawn_amount: u64::from_le_bytes(bytes[72..80].try_into().unwrap()),
196 start_slot: u64::from_le_bytes(bytes[80..88].try_into().unwrap()),
197 end_slot: u64::from_le_bytes(bytes[88..96].try_into().unwrap()),
198 cancelled: bytes[96] == 1,
199 created_slot: u64::from_le_bytes(bytes[97..105].try_into().unwrap()),
200 })
201}
202
203fn decode_stream_info(stream_id: u64, bytes: &[u8]) -> Result<SporePayStreamInfo> {
204 if bytes.len() < STREAM_INFO_SIZE {
205 return Err(Error::ParseError(
206 "SporePay stream-info payload was shorter than expected".into(),
207 ));
208 }
209
210 Ok(SporePayStreamInfo {
211 stream: decode_stream(stream_id, bytes)?,
212 cliff_slot: u64::from_le_bytes(bytes[105..113].try_into().unwrap()),
213 })
214}
215
216impl SporePayClient {
217 pub fn new(client: Client) -> Self {
218 Self {
219 client,
220 program_id: std::sync::Arc::new(std::sync::Mutex::new(None)),
221 }
222 }
223
224 pub fn with_program_id(client: Client, program_id: Pubkey) -> Self {
225 Self {
226 client,
227 program_id: std::sync::Arc::new(std::sync::Mutex::new(Some(program_id))),
228 }
229 }
230
231 pub async fn get_program_id(&self) -> Result<Pubkey> {
232 if let Some(program_id) = self
233 .program_id
234 .lock()
235 .map_err(|_| Error::ConfigError("SporePayClient program cache lock poisoned".into()))?
236 .clone()
237 {
238 return Ok(program_id);
239 }
240
241 for symbol in PROGRAM_SYMBOL_CANDIDATES {
242 let entry = match self.client.get_symbol_registry(symbol).await {
243 Ok(entry) => entry,
244 Err(_) => continue,
245 };
246 let Some(program) = entry.get("program").and_then(|value| value.as_str()) else {
247 continue;
248 };
249 let program_id = Pubkey::from_base58(program).map_err(Error::ParseError)?;
250 *self.program_id.lock().map_err(|_| {
251 Error::ConfigError("SporePayClient program cache lock poisoned".into())
252 })? = Some(program_id);
253 return Ok(program_id);
254 }
255
256 Err(Error::ConfigError(
257 "Unable to resolve the SporePay program via getSymbolRegistry(\"SPOREPAY\")".into(),
258 ))
259 }
260
261 pub async fn get_stream(&self, stream_id: u64) -> Result<Option<SporePayStream>> {
262 let result = self
263 .client
264 .call_readonly_contract(
265 &self.get_program_id().await?,
266 "get_stream",
267 encode_stream_lookup_args(stream_id),
268 None,
269 )
270 .await?;
271
272 if result.return_code == Some(1) || result.return_data.is_none() {
273 return Ok(None);
274 }
275
276 ensure_readonly_success(&result, "get_stream")?;
277 let bytes = decode_return_data(&result, "get_stream")?;
278 decode_stream(stream_id, &bytes).map(Some)
279 }
280
281 pub async fn get_stream_info(&self, stream_id: u64) -> Result<Option<SporePayStreamInfo>> {
282 let result = self
283 .client
284 .call_readonly_contract(
285 &self.get_program_id().await?,
286 "get_stream_info",
287 encode_stream_lookup_args(stream_id),
288 None,
289 )
290 .await?;
291
292 if result.return_code == Some(1) || result.return_data.is_none() {
293 return Ok(None);
294 }
295
296 ensure_readonly_success(&result, "get_stream_info")?;
297 let bytes = decode_return_data(&result, "get_stream_info")?;
298 decode_stream_info(stream_id, &bytes).map(Some)
299 }
300
301 pub async fn get_withdrawable(&self, stream_id: u64) -> Result<u64> {
302 let result = self
303 .client
304 .call_readonly_contract(
305 &self.get_program_id().await?,
306 "get_withdrawable",
307 encode_stream_lookup_args(stream_id),
308 None,
309 )
310 .await?;
311
312 ensure_readonly_success(&result, "get_withdrawable")?;
313 let bytes = decode_return_data(&result, "get_withdrawable")?;
314 if bytes.len() < 8 {
315 return Err(Error::ParseError(
316 "SporePay withdrawable payload was shorter than expected".into(),
317 ));
318 }
319 Ok(u64::from_le_bytes(bytes[..8].try_into().unwrap()))
320 }
321
322 pub async fn get_stats(&self) -> Result<SporePayStats> {
323 let value = self.client.get_sporepay_stats().await?;
324 serde_json::from_value(value).map_err(|err| Error::ParseError(err.to_string()))
325 }
326
327 pub async fn create_stream(
328 &self,
329 sender: &Keypair,
330 params: CreateStreamParams,
331 ) -> Result<String> {
332 let program_id = self.get_program_id().await?;
333 let args = encode_create_stream_args(&sender.pubkey(), ¶ms);
334 self.client
335 .call_contract(sender, &program_id, "create_stream", args, 0)
336 .await
337 }
338
339 pub async fn create_stream_with_cliff(
340 &self,
341 sender: &Keypair,
342 params: CreateStreamWithCliffParams,
343 ) -> Result<String> {
344 let program_id = self.get_program_id().await?;
345 let args = encode_create_stream_with_cliff_args(&sender.pubkey(), ¶ms);
346 self.client
347 .call_contract(sender, &program_id, "create_stream_with_cliff", args, 0)
348 .await
349 }
350
351 pub async fn withdraw_from_stream(
352 &self,
353 recipient: &Keypair,
354 params: WithdrawFromStreamParams,
355 ) -> Result<String> {
356 let program_id = self.get_program_id().await?;
357 let args = encode_withdraw_args(&recipient.pubkey(), ¶ms);
358 self.client
359 .call_contract(recipient, &program_id, "withdraw_from_stream", args, 0)
360 .await
361 }
362
363 pub async fn cancel_stream(&self, sender: &Keypair, stream_id: u64) -> Result<String> {
364 let program_id = self.get_program_id().await?;
365 let args = encode_cancel_args(&sender.pubkey(), stream_id);
366 self.client
367 .call_contract(sender, &program_id, "cancel_stream", args, 0)
368 .await
369 }
370
371 pub async fn transfer_stream(
372 &self,
373 recipient: &Keypair,
374 params: TransferStreamParams,
375 ) -> Result<String> {
376 let program_id = self.get_program_id().await?;
377 let args = encode_transfer_args(&recipient.pubkey(), ¶ms);
378 self.client
379 .call_contract(recipient, &program_id, "transfer_stream", args, 0)
380 .await
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn create_stream_encoding_includes_all_u64_layout_entries() {
390 let sender = Pubkey([1u8; 32]);
391 let encoded = encode_create_stream_args(
392 &sender,
393 &CreateStreamParams {
394 recipient: Pubkey([2u8; 32]),
395 total_amount: 100,
396 start_slot: 10,
397 end_slot: 20,
398 },
399 );
400 assert_eq!(&encoded[..6], &[0xAB, 0x20, 0x20, 0x08, 0x08, 0x08]);
401 }
402
403 #[test]
404 fn decode_stream_info_reads_cliff_slot() {
405 let mut payload = vec![0u8; STREAM_INFO_SIZE];
406 payload[64..72].copy_from_slice(&100u64.to_le_bytes());
407 payload[72..80].copy_from_slice(&25u64.to_le_bytes());
408 payload[80..88].copy_from_slice(&10u64.to_le_bytes());
409 payload[88..96].copy_from_slice(&20u64.to_le_bytes());
410 payload[97..105].copy_from_slice(&9u64.to_le_bytes());
411 payload[105..113].copy_from_slice(&12u64.to_le_bytes());
412
413 let info = decode_stream_info(7, &payload).unwrap();
414 assert_eq!(info.stream.stream_id, 7);
415 assert_eq!(info.cliff_slot, 12);
416 }
417}