Skip to main content

cdk_supabase/
wallet.rs

1use std::collections::HashMap;
2use std::fmt::Debug;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6
7use aes_gcm::aead::{Aead, AeadCore, KeyInit};
8use aes_gcm::{Aes256Gcm, Key, Nonce};
9use async_trait::async_trait;
10use bitcoin::bip32::DerivationPath;
11use bitcoin::secp256k1::rand::rngs::OsRng;
12use bitcoin::secp256k1::rand::RngCore;
13use cdk_common::auth::oidc::OidcClient;
14use cdk_common::common::ProofInfo;
15use cdk_common::database::wallet::Database;
16use cdk_common::database::{Error as DatabaseError, KVStoreDatabase};
17use cdk_common::mint_url::MintUrl;
18use cdk_common::nuts::{
19    CurrencyUnit, Id, KeySet, KeySetInfo, Keys, MintInfo, PublicKey, SpendingConditions, State,
20};
21use cdk_common::secret::Secret;
22use cdk_common::util::hex;
23use cdk_common::wallet::{
24    self, MintQuote, Transaction, TransactionDirection, TransactionId, WalletSaga,
25};
26use reqwest::{Client, StatusCode};
27use scrypt::Params as ScryptParams;
28use serde::{Deserialize, Serialize};
29use tokio::sync::RwLock;
30use url::Url;
31
32use crate::Error;
33
34#[rustfmt::skip]
35mod migrations {
36    include!(concat!(env!("OUT_DIR"), "/migrations_supabase.rs"));
37}
38
39/// Returns the concatenated SQL of all migration files.
40///
41/// Operators can use this to set up the database manually via the Supabase
42/// Dashboard SQL editor or `supabase db push`.
43pub(crate) fn get_schema_sql_inner() -> String {
44    migrations::MIGRATIONS
45        .iter()
46        .map(|(_, _, sql)| *sql)
47        .collect::<Vec<&str>>()
48        .join("\n\n")
49}
50
51/// URL-encode a value for use in query parameters
52fn url_encode(value: &str) -> String {
53    urlencoding::encode(value).into_owned()
54}
55
56const ENCRYPTION_METADATA_VERSION: i64 = 1;
57const ENCRYPTION_KDF: &str = "scrypt";
58const ENCRYPTION_SALT_BYTES: usize = 16;
59const ENCRYPTION_KEY_BYTES: usize = 32;
60const SCRYPT_LOG_N: u8 = 15;
61const SCRYPT_R: u32 = 8;
62const SCRYPT_P: u32 = 1;
63
64/// Decode JWT expiration from token string (without verification)
65fn decode_jwt_expiry(token: &str) -> Option<u64> {
66    let parts: Vec<&str> = token.split('.').collect();
67    if parts.len() < 2 {
68        return None;
69    }
70    let payload_part = parts[1];
71
72    use bitcoin::base64::engine::general_purpose;
73    use bitcoin::base64::Engine as _;
74
75    let decoded = general_purpose::URL_SAFE_NO_PAD.decode(payload_part).ok()?;
76
77    #[derive(Deserialize)]
78    struct Claims {
79        exp: Option<u64>,
80    }
81
82    let claims: Claims = serde_json::from_slice(&decoded).ok()?;
83    claims.exp
84}
85
86/// Authentication provider for Supabase
87///
88/// This enum abstracts the token refresh logic for different authentication methods.
89#[derive(Debug, Clone)]
90pub enum AuthProvider {
91    /// No authentication provider - uses API key only, no automatic token refresh
92    None,
93    /// Supabase Auth (GoTrue) - uses Supabase's built-in authentication
94    ///
95    /// Token refresh uses `POST /auth/v1/token` with `grant_type=refresh_token`
96    SupabaseAuth,
97    /// External OIDC provider - uses standard OIDC discovery and token endpoint
98    Oidc(OidcClient),
99}
100
101/// Response from Supabase Auth token refresh
102#[derive(Debug, Deserialize)]
103struct SupabaseTokenResponse {
104    access_token: String,
105    refresh_token: Option<String>,
106    expires_in: Option<i64>,
107    #[serde(skip)]
108    _token_type: (),
109}
110
111/// Supabase wallet database implementation
112///
113/// This database uses two types of authentication:
114/// - `api_key`: The Supabase project API key (required, used in `apikey` header)
115/// - `jwt_token`: An optional JWT token for user authentication (used in `Authorization: Bearer` header)
116///
117/// When `jwt_token` is set, requests will include both headers:
118/// - `apikey: <api_key>`
119/// - `Authorization: Bearer <jwt_token>`
120///
121/// When `jwt_token` is not set, the `api_key` is used for both headers (legacy behavior).
122///
123/// ## Authentication Providers
124///
125/// The database supports multiple authentication providers via [`AuthProvider`]:
126/// - **None**: No automatic token refresh, use API key only
127/// - **SupabaseAuth**: Uses Supabase's GoTrue API for token refresh
128/// - **Oidc**: Uses an external OIDC provider for token refresh
129#[derive(Debug, Clone)]
130pub struct SupabaseWalletDatabase {
131    url: Url,
132    api_key: String,
133    jwt_token: Arc<RwLock<Option<String>>>,
134    refresh_token: Arc<RwLock<Option<String>>>,
135    token_expiration: Arc<RwLock<Option<u64>>>,
136    auth_provider: Arc<RwLock<AuthProvider>>,
137    client: Client,
138    encryption_key: Arc<RwLock<Option<Key<Aes256Gcm>>>>,
139}
140
141impl SupabaseWalletDatabase {
142    /// Create a new SupabaseWalletDatabase with API key only (legacy behavior)
143    ///
144    /// No automatic token refresh is configured.
145    ///
146    /// **Note**: This does NOT run or check migrations automatically. After
147    /// authentication, call [`check_schema_compatibility()`] to verify the
148    /// database schema is ready. Migrations must be run separately by an
149    /// administrator — see [`get_schema_sql()`] or use `supabase db push`.
150    pub async fn new(url: Url, api_key: String) -> Result<Self, Error> {
151        Ok(Self {
152            url,
153            api_key,
154            jwt_token: Arc::new(RwLock::new(None)),
155            refresh_token: Arc::new(RwLock::new(None)),
156            token_expiration: Arc::new(RwLock::new(None)),
157            auth_provider: Arc::new(RwLock::new(AuthProvider::None)),
158            client: Client::new(),
159            encryption_key: Arc::new(RwLock::new(None)),
160        })
161    }
162
163    /// Create a new SupabaseWalletDatabase with Supabase Auth for token refresh
164    ///
165    /// This uses Supabase's built-in GoTrue authentication system.
166    /// Token refresh uses `POST /auth/v1/token` with `grant_type=refresh_token`.
167    ///
168    /// **Note**: This does NOT run or check migrations automatically. After
169    /// authentication, call [`check_schema_compatibility()`] to verify the
170    /// database schema is ready. Migrations must be run separately by an
171    /// administrator — see [`get_schema_sql()`] or use `supabase db push`.
172    pub async fn with_supabase_auth(url: Url, api_key: String) -> Result<Self, Error> {
173        Ok(Self {
174            url,
175            api_key,
176            jwt_token: Arc::new(RwLock::new(None)),
177            refresh_token: Arc::new(RwLock::new(None)),
178            token_expiration: Arc::new(RwLock::new(None)),
179            auth_provider: Arc::new(RwLock::new(AuthProvider::SupabaseAuth)),
180            client: Client::new(),
181            encryption_key: Arc::new(RwLock::new(None)),
182        })
183    }
184
185    /// Create a new SupabaseWalletDatabase with external OIDC client for auth
186    ///
187    /// This uses an external OIDC provider (e.g., Keycloak, Auth0) for token refresh.
188    /// The OIDC provider must be configured in Supabase to validate the JWTs.
189    ///
190    /// **Note**: This does NOT run or check migrations automatically. After
191    /// authentication, call [`check_schema_compatibility()`] to verify the
192    /// database schema is ready. Migrations must be run separately by an
193    /// administrator — see [`get_schema_sql()`] or use `supabase db push`.
194    pub async fn with_oidc(
195        url: Url,
196        api_key: String,
197        oidc_client: OidcClient,
198    ) -> Result<Self, Error> {
199        Ok(Self {
200            url,
201            api_key,
202            jwt_token: Arc::new(RwLock::new(None)),
203            refresh_token: Arc::new(RwLock::new(None)),
204            token_expiration: Arc::new(RwLock::new(None)),
205            auth_provider: Arc::new(RwLock::new(AuthProvider::Oidc(oidc_client))),
206            client: Client::new(),
207            encryption_key: Arc::new(RwLock::new(None)),
208        })
209    }
210
211    /// The schema version required by this SDK version.
212    ///
213    /// This must match the latest `schema_version` value set in the migration files.
214    /// When adding new migrations, update this constant and set the same value
215    /// in the new migration's `INSERT INTO schema_info` statement.
216    pub const REQUIRED_SCHEMA_VERSION: u32 = 7;
217
218    /// Get the full database schema SQL
219    ///
220    /// Returns the concatenated SQL of all migration files.
221    ///
222    /// Use this to set up or update the database schema by running the output
223    /// through the Supabase Dashboard SQL editor or `supabase db push`.
224    /// This is an **admin-only operation** — never run this from a client app.
225    pub fn get_schema_sql() -> String {
226        get_schema_sql_inner()
227    }
228
229    /// Check that the database schema is compatible with this SDK version
230    ///
231    /// This is the **recommended client-side startup check**. It queries the
232    /// `schema_info` table (which is readable by all authenticated users) to
233    /// verify the database has the required schema version.
234    ///
235    /// # Errors
236    ///
237    /// - [`Error::SchemaNotInitialized`] if the `schema_info` table doesn't exist
238    ///   (database was never set up or is running a pre-v4 schema).
239    /// - [`Error::SchemaMismatch`] if the database schema version is older than
240    ///   what this SDK version requires.
241    ///
242    /// # Example
243    ///
244    /// ```rust,ignore
245    /// // Call after authentication, before using the database
246    /// db.check_schema_compatibility().await?;
247    /// // Database is ready for use
248    /// ```
249    pub async fn check_schema_compatibility(&self) -> Result<(), Error> {
250        let path = "rest/v1/schema_info?key=eq.schema_version&select=value";
251
252        let result = self.get_request(path).await;
253
254        match result {
255            Ok((status, text)) => {
256                if status == StatusCode::NOT_FOUND
257                    || text.contains("relation")
258                    || text.contains("does not exist")
259                {
260                    return Err(Error::SchemaNotInitialized);
261                }
262
263                if !status.is_success() {
264                    // If we get a 404-like error or permission error, schema_info
265                    // table likely doesn't exist
266                    return Err(Error::SchemaNotInitialized);
267                }
268
269                // Parse the response: [{"value": "4"}] or []
270                let items: Vec<serde_json::Value> =
271                    serde_json::from_str(&text).map_err(|_| Error::SchemaNotInitialized)?;
272
273                if items.is_empty() {
274                    return Err(Error::SchemaNotInitialized);
275                }
276
277                let version_str = items[0]
278                    .get("value")
279                    .and_then(|v| v.as_str())
280                    .ok_or(Error::SchemaNotInitialized)?;
281
282                let found_version: u32 = version_str
283                    .parse()
284                    .map_err(|_| Error::SchemaNotInitialized)?;
285
286                if found_version < Self::REQUIRED_SCHEMA_VERSION {
287                    return Err(Error::SchemaMismatch {
288                        required: Self::REQUIRED_SCHEMA_VERSION,
289                        found: found_version,
290                    });
291                }
292
293                tracing::info!(
294                    schema_version = found_version,
295                    required = Self::REQUIRED_SCHEMA_VERSION,
296                    "Database schema compatibility check passed"
297                );
298
299                Ok(())
300            }
301            Err(_) => Err(Error::SchemaNotInitialized),
302        }
303    }
304
305    /// Set or update the JWT token for authentication
306    pub async fn set_jwt_token(&self, token: Option<String>) {
307        let mut jwt = self.jwt_token.write().await;
308        *jwt = token.clone();
309
310        let mut expiration = self.token_expiration.write().await;
311
312        if let Some(t) = token {
313            *expiration = decode_jwt_expiry(&t);
314        } else {
315            *expiration = None;
316        }
317    }
318
319    /// Set refresh token
320    pub async fn set_refresh_token(&self, token: Option<String>) {
321        let mut refresh = self.refresh_token.write().await;
322        *refresh = token;
323    }
324
325    /// Derives an AES-256-GCM encryption key from `password` via scrypt.
326    pub async fn set_encryption_password(&self, password: &str) -> Result<(), Error> {
327        let metadata = self.get_or_create_encryption_metadata().await?;
328        let key = Self::derive_encryption_key(password, &metadata)?;
329
330        let mut encryption_key = self.encryption_key.write().await;
331        *encryption_key = Some(key);
332        Ok(())
333    }
334
335    async fn get_or_create_encryption_metadata(&self) -> Result<EncryptionMetadataTable, Error> {
336        if let Some(metadata) = self.get_encryption_metadata().await? {
337            return Ok(metadata);
338        }
339
340        let metadata = EncryptionMetadataTable::new();
341        let (status, text) = self
342            .insert_request("rest/v1/wallet_encryption_metadata", &metadata)
343            .await?;
344
345        match status {
346            s if s.is_success() => Ok(metadata),
347            StatusCode::CONFLICT => self.get_encryption_metadata().await?.ok_or_else(|| {
348                Error::Supabase(
349                    "wallet encryption metadata conflicted but could not be loaded".to_string(),
350                )
351            }),
352            _ => Err(Error::Supabase(format!(
353                "wallet encryption metadata insert failed: HTTP {} - {}",
354                status, text
355            ))),
356        }
357    }
358
359    async fn get_encryption_metadata(&self) -> Result<Option<EncryptionMetadataTable>, Error> {
360        let path = "rest/v1/wallet_encryption_metadata?select=version,kdf,salt,scrypt_log_n,scrypt_r,scrypt_p&limit=1";
361        let (status, text) = self.get_request(path).await?;
362
363        if !status.is_success() && status != StatusCode::NO_CONTENT {
364            return Err(Error::Supabase(format!(
365                "wallet encryption metadata lookup failed: HTTP {} - {}",
366                status, text
367            )));
368        }
369
370        Ok(Self::parse_response::<EncryptionMetadataTable>(&text)?
371            .and_then(|rows| rows.into_iter().next()))
372    }
373
374    fn derive_encryption_key(
375        password: &str,
376        metadata: &EncryptionMetadataTable,
377    ) -> Result<Key<Aes256Gcm>, Error> {
378        if metadata.version != ENCRYPTION_METADATA_VERSION || metadata.kdf != ENCRYPTION_KDF {
379            return Err(Error::Supabase(format!(
380                "unsupported wallet encryption metadata: version={}, kdf={}",
381                metadata.version, metadata.kdf
382            )));
383        }
384
385        let salt = hex::decode(&metadata.salt)
386            .map_err(|_| Error::Supabase("invalid wallet encryption salt".to_string()))?;
387        let log_n = u8::try_from(metadata.scrypt_log_n)
388            .map_err(|_| Error::Supabase("invalid wallet encryption scrypt log_n".to_string()))?;
389        let r = u32::try_from(metadata.scrypt_r)
390            .map_err(|_| Error::Supabase("invalid wallet encryption scrypt r".to_string()))?;
391        let p = u32::try_from(metadata.scrypt_p)
392            .map_err(|_| Error::Supabase("invalid wallet encryption scrypt p".to_string()))?;
393        let params = ScryptParams::new(log_n, r, p)
394            .map_err(|_| Error::Supabase("invalid wallet encryption scrypt params".to_string()))?;
395        let mut key = [0u8; ENCRYPTION_KEY_BYTES];
396        scrypt::scrypt(password.as_bytes(), &salt, &params, &mut key)
397            .map_err(|_| Error::Supabase("wallet encryption key derivation failed".to_string()))?;
398
399        Ok(*Key::<Aes256Gcm>::from_slice(&key))
400    }
401
402    async fn encrypt(&self, data: &[u8]) -> Result<Vec<u8>, DatabaseError> {
403        let key_guard = self.encryption_key.read().await;
404        let key = key_guard
405            .as_ref()
406            .ok_or(DatabaseError::Internal("Encryption key not set".into()))?;
407        let cipher = Aes256Gcm::new(key);
408        let nonce = Aes256Gcm::generate_nonce(&mut OsRng); // 96-bits; unique per message
409        let ciphertext = cipher
410            .encrypt(&nonce, data)
411            .map_err(|_| DatabaseError::Internal("Encryption failed".into()))?;
412
413        // Prepend nonce to ciphertext
414        let mut result = nonce.to_vec();
415        result.extend_from_slice(&ciphertext);
416        Ok(result)
417    }
418
419    async fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, DatabaseError> {
420        let key_guard = self.encryption_key.read().await;
421        let key = key_guard
422            .as_ref()
423            .ok_or(DatabaseError::Internal("Encryption key not set".into()))?;
424        let cipher = Aes256Gcm::new(key);
425
426        if data.len() < 12 {
427            return Err(DatabaseError::Internal("Invalid ciphertext length".into()));
428        }
429
430        let nonce = Nonce::from_slice(&data[0..12]);
431        let ciphertext = &data[12..];
432
433        cipher
434            .decrypt(nonce, ciphertext)
435            .map_err(|_| DatabaseError::Internal("Decryption failed".into()))
436    }
437
438    async fn decrypt_proof_table(&self, p: &mut ProofTable) {
439        // Decrypt secret
440        if let Ok(encrypted_bytes) = hex::decode(&p.secret) {
441            if let Ok(decrypted) = self.decrypt(&encrypted_bytes).await {
442                if let Ok(secret_str) = String::from_utf8(decrypted) {
443                    p.secret = secret_str;
444                }
445            }
446        }
447
448        // Decrypt C
449        if let Ok(encrypted_c) = hex::decode(&p.c) {
450            if let Ok(decrypted_c) = self.decrypt(&encrypted_c).await {
451                p.c = hex::encode(decrypted_c);
452            }
453        }
454    }
455
456    /// Refresh the access token using the stored refresh token
457    ///
458    /// This method handles different authentication providers:
459    /// - **SupabaseAuth**: Uses `POST /auth/v1/token` with `grant_type=refresh_token`
460    /// - **Oidc**: Uses the OIDC provider's token endpoint
461    /// - **None**: Returns an error (no provider configured)
462    pub async fn refresh_access_token(&self) -> Result<(), Error> {
463        let refresh_token = self.refresh_token.read().await.clone();
464        let refresh = refresh_token
465            .ok_or_else(|| Error::Supabase("No refresh token available".to_string()))?;
466
467        let auth_provider = self.auth_provider.read().await.clone();
468
469        match auth_provider {
470            AuthProvider::None => {
471                return Err(Error::Supabase(
472                    "No authentication provider configured".to_string(),
473                ));
474            }
475            AuthProvider::SupabaseAuth => {
476                // Use Supabase GoTrue API for token refresh
477                let auth_url = self
478                    .url
479                    .join("auth/v1/token?grant_type=refresh_token")
480                    .map_err(|e| Error::Supabase(format!("Invalid auth URL: {}", e)))?;
481
482                let body = serde_json::json!({
483                    "refresh_token": refresh
484                });
485
486                let response = self
487                    .client
488                    .post(auth_url)
489                    .header("apikey", &self.api_key)
490                    .header("Content-Type", "application/json")
491                    .json(&body)
492                    .send()
493                    .await
494                    .map_err(Error::Reqwest)?;
495
496                let status = response.status();
497                if !status.is_success() {
498                    let text = response.text().await.unwrap_or_default();
499                    return Err(Error::Supabase(format!(
500                        "Supabase token refresh failed: HTTP {} - {}",
501                        status, text
502                    )));
503                }
504
505                let token_response: SupabaseTokenResponse =
506                    response.json().await.map_err(Error::Reqwest)?;
507
508                self.set_jwt_token(Some(token_response.access_token)).await;
509
510                if let Some(new_refresh) = token_response.refresh_token {
511                    self.set_refresh_token(Some(new_refresh)).await;
512                }
513
514                if let Some(expires_in) = token_response.expires_in {
515                    let expiration = SystemTime::now()
516                        .duration_since(UNIX_EPOCH)
517                        .map_err(|e| Error::Supabase(format!("SystemTime error: {}", e)))?
518                        .as_secs()
519                        + expires_in as u64;
520                    let mut exp = self.token_expiration.write().await;
521                    *exp = Some(expiration);
522                }
523            }
524            AuthProvider::Oidc(oidc) => {
525                let client_id = oidc.client_id().ok_or_else(|| {
526                    Error::Supabase("Client ID not set in OIDC client".to_string())
527                })?;
528
529                let response = oidc
530                    .refresh_access_token(client_id, refresh)
531                    .await
532                    .map_err(|e| Error::Supabase(e.to_string()))?;
533
534                self.set_jwt_token(Some(response.access_token)).await;
535
536                if let Some(new_refresh) = response.refresh_token {
537                    self.set_refresh_token(Some(new_refresh)).await;
538                }
539
540                if let Some(expires_in) = response.expires_in {
541                    let expiration = SystemTime::now()
542                        .duration_since(UNIX_EPOCH)
543                        .map_err(|e| Error::Supabase(format!("SystemTime error: {}", e)))?
544                        .as_secs()
545                        + expires_in as u64;
546                    let mut exp = self.token_expiration.write().await;
547                    *exp = Some(expiration);
548                }
549            }
550        }
551
552        Ok(())
553    }
554
555    /// Sign up a new user and automatically set tokens if returned
556    pub async fn signup(&self, email: &str, password: &str) -> Result<SupabaseAuthResponse, Error> {
557        let response = SupabaseAuth::signup(&self.url, &self.api_key, email, password).await?;
558
559        // If signup returns valid tokens (e.g. auto-confirm enabled), set them
560        if !response.access_token.is_empty() {
561            self.set_jwt_token(Some(response.access_token.clone()))
562                .await;
563        }
564        if let Some(refresh) = &response.refresh_token {
565            self.set_refresh_token(Some(refresh.clone())).await;
566        }
567
568        Ok(response)
569    }
570
571    /// Sign in a user and automatically set tokens on the database instance
572    pub async fn signin(&self, email: &str, password: &str) -> Result<SupabaseAuthResponse, Error> {
573        let response = SupabaseAuth::signin(&self.url, &self.api_key, email, password).await?;
574
575        self.set_jwt_token(Some(response.access_token.clone()))
576            .await;
577        if let Some(refresh) = &response.refresh_token {
578            self.set_refresh_token(Some(refresh.clone())).await;
579        }
580        if let Some(expires_in) = response.expires_in {
581            let expiration = SystemTime::now()
582                .duration_since(UNIX_EPOCH)
583                .map_err(|e| Error::Supabase(format!("SystemTime error: {}", e)))?
584                .as_secs()
585                + expires_in as u64;
586            let mut exp = self.token_expiration.write().await;
587            *exp = Some(expiration);
588        }
589
590        Ok(response)
591    }
592
593    /// Get the current JWT token if set
594    pub async fn get_jwt_token(&self) -> Option<String> {
595        self.jwt_token.read().await.clone()
596    }
597
598    /// Call a Supabase RPC function with JSON parameters
599    pub async fn call_rpc(&self, function_name: &str, params_json: &str) -> Result<String, Error> {
600        // Parse the JSON to validate it and convert to Value for sending
601        // Treat empty string as empty object for convenience
602        let params: serde_json::Value = if params_json.trim().is_empty() {
603            serde_json::Value::Object(serde_json::Map::new())
604        } else {
605            serde_json::from_str(params_json).map_err(Error::Serde)?
606        };
607
608        let path = format!("rest/v1/rpc/{}", url_encode(function_name));
609        let url = self.join_url(&path)?;
610        let auth_bearer = self.get_auth_bearer().await;
611
612        let res = self
613            .client
614            .post(url.clone())
615            .header("apikey", &self.api_key)
616            .header("Authorization", format!("Bearer {}", auth_bearer))
617            .header("Content-Type", "application/json")
618            .json(&params)
619            .send()
620            .await
621            .map_err(Error::Reqwest)?;
622
623        let status = res.status();
624        let text = res.text().await.map_err(Error::Reqwest)?;
625
626        if !status.is_success() {
627            return Err(Error::Supabase(format!(
628                "RPC '{}' failed: HTTP {} - {}",
629                function_name, status, text
630            )));
631        }
632
633        Ok(text)
634    }
635
636    /// Get the authorization token to use for requests
637    ///
638    /// Returns the JWT token if set, otherwise falls back to the API key.
639    async fn get_auth_bearer(&self) -> String {
640        // Check expiration
641        let expiration = *self.token_expiration.read().await;
642        if let Some(exp) = expiration {
643            let now = SystemTime::now()
644                .duration_since(UNIX_EPOCH)
645                .expect("SystemTime should be after UNIX_EPOCH")
646                .as_secs();
647            // Refresh if expired or expiring in 60 seconds
648            if now + 60 > exp {
649                if let Err(e) = self.refresh_access_token().await {
650                    tracing::warn!("Failed to refresh token: {}", e);
651                }
652            }
653        }
654
655        self.jwt_token
656            .read()
657            .await
658            .clone()
659            .unwrap_or_else(|| self.api_key.clone())
660    }
661
662    /// Join the base URL with a path
663    pub fn join_url(&self, path: &str) -> Result<Url, DatabaseError> {
664        self.url
665            .join(path)
666            .map_err(|e| DatabaseError::Internal(e.to_string()))
667    }
668
669    /// Make a GET request and return the response text
670    async fn get_request(&self, path: &str) -> Result<(StatusCode, String), Error> {
671        let url = self.join_url(path)?;
672        let auth_bearer = self.get_auth_bearer().await;
673
674        tracing::debug!(method = "GET", url = %url, "Supabase request");
675
676        let res = self
677            .client
678            .get(url.clone())
679            .header("apikey", &self.api_key)
680            .header("Authorization", format!("Bearer {}", auth_bearer))
681            .send()
682            .await
683            .map_err(Error::Reqwest)?;
684
685        let status = res.status();
686        let text = res.text().await.map_err(Error::Reqwest)?;
687
688        tracing::debug!(method = "GET", url = %url, status = %status, response_len = text.len(), "Supabase response");
689
690        Ok((status, text))
691    }
692
693    /// Make a POST request with JSON body
694    async fn post_request<T: Serialize + Debug>(
695        &self,
696        path: &str,
697        body: &T,
698    ) -> Result<(StatusCode, String), Error> {
699        let url = self.join_url(path)?;
700        let auth_bearer = self.get_auth_bearer().await;
701
702        tracing::debug!(method = "POST", url = %url, "Supabase request");
703
704        let res = self
705            .client
706            .post(url.clone())
707            .header("apikey", &self.api_key)
708            .header("Authorization", format!("Bearer {}", auth_bearer))
709            .header("Prefer", "resolution=merge-duplicates,missing=default")
710            .json(body)
711            .send()
712            .await
713            .map_err(Error::Reqwest)?;
714
715        let status = res.status();
716        let text = res.text().await.map_err(Error::Reqwest)?;
717
718        tracing::debug!(method = "POST", url = %url, status = %status, response_len = text.len(), "Supabase response");
719
720        Ok((status, text))
721    }
722
723    /// Make a plain INSERT POST request (no upsert/merge).
724    ///
725    /// Unlike [`post_request`], this does NOT send `Prefer: resolution=merge-duplicates`,
726    /// so PostgREST returns `409 Conflict` if a row with the same primary key already
727    /// exists. This is used by the optimistic-locking insert path so a concurrent
728    /// row can be detected instead of silently overwritten.
729    async fn insert_request<T: Serialize + Debug>(
730        &self,
731        path: &str,
732        body: &T,
733    ) -> Result<(StatusCode, String), Error> {
734        let url = self.join_url(path)?;
735        let auth_bearer = self.get_auth_bearer().await;
736
737        tracing::debug!(method = "POST", url = %url, "Supabase insert request");
738
739        let res = self
740            .client
741            .post(url.clone())
742            .header("apikey", &self.api_key)
743            .header("Authorization", format!("Bearer {}", auth_bearer))
744            .header("Prefer", "missing=default")
745            .json(body)
746            .send()
747            .await
748            .map_err(Error::Reqwest)?;
749
750        let status = res.status();
751        let text = res.text().await.map_err(Error::Reqwest)?;
752
753        tracing::debug!(method = "POST", url = %url, status = %status, response_len = text.len(), "Supabase insert response");
754
755        Ok((status, text))
756    }
757
758    /// Make a PATCH request with JSON body
759    async fn patch_request<T: Serialize + Debug>(
760        &self,
761        path: &str,
762        body: &T,
763    ) -> Result<(StatusCode, String), Error> {
764        let url = self.join_url(path)?;
765        let auth_bearer = self.get_auth_bearer().await;
766
767        tracing::debug!(method = "PATCH", url = %url, "Supabase request");
768
769        let res = self
770            .client
771            .patch(url.clone())
772            .header("apikey", &self.api_key)
773            .header("Authorization", format!("Bearer {}", auth_bearer))
774            .json(body)
775            .send()
776            .await
777            .map_err(Error::Reqwest)?;
778
779        let status = res.status();
780        let text = res.text().await.map_err(Error::Reqwest)?;
781
782        tracing::debug!(method = "PATCH", url = %url, status = %status, response_len = text.len(), "Supabase response");
783
784        Ok((status, text))
785    }
786
787    /// Make a PATCH request and ask PostgREST to return the updated rows as JSON
788    /// (`Prefer: return=representation`).  Returns `(status, body)` where body is
789    /// an empty JSON array `[]` when the filter matched no rows.
790    async fn patch_request_returning<T: Serialize + Debug>(
791        &self,
792        path: &str,
793        body: &T,
794    ) -> Result<(StatusCode, String), Error> {
795        let url = self.join_url(path)?;
796        let auth_bearer = self.get_auth_bearer().await;
797
798        tracing::debug!(method = "PATCH", url = %url, "Supabase request (returning)");
799
800        let res = self
801            .client
802            .patch(url.clone())
803            .header("apikey", &self.api_key)
804            .header("Authorization", format!("Bearer {}", auth_bearer))
805            .header("Prefer", "return=representation")
806            .json(body)
807            .send()
808            .await
809            .map_err(Error::Reqwest)?;
810
811        let status = res.status();
812        let text = res.text().await.map_err(Error::Reqwest)?;
813
814        tracing::debug!(method = "PATCH", url = %url, status = %status, response_len = text.len(), "Supabase response (returning)");
815
816        Ok((status, text))
817    }
818
819    /// Make a DELETE request
820    async fn delete_request(&self, path: &str) -> Result<(StatusCode, String), Error> {
821        let url = self.join_url(path)?;
822        let auth_bearer = self.get_auth_bearer().await;
823
824        tracing::debug!(method = "DELETE", url = %url, "Supabase request");
825
826        let res = self
827            .client
828            .delete(url.clone())
829            .header("apikey", &self.api_key)
830            .header("Authorization", format!("Bearer {}", auth_bearer))
831            .send()
832            .await
833            .map_err(Error::Reqwest)?;
834
835        let status = res.status();
836        let text = res.text().await.map_err(Error::Reqwest)?;
837
838        tracing::debug!(method = "DELETE", url = %url, status = %status, response_len = text.len(), "Supabase response");
839
840        Ok((status, text))
841    }
842
843    /// Parse a JSON response, returning None for empty responses
844    fn parse_response<T: serde::de::DeserializeOwned>(text: &str) -> Result<Option<Vec<T>>, Error> {
845        if text.trim().is_empty() || text.trim() == "[]" {
846            return Ok(None);
847        }
848        let items: Vec<T> = serde_json::from_str(text).map_err(Error::Serde)?;
849        if items.is_empty() {
850            Ok(None)
851        } else {
852            Ok(Some(items))
853        }
854    }
855}
856
857#[async_trait]
858impl KVStoreDatabase for SupabaseWalletDatabase {
859    type Err = DatabaseError;
860
861    async fn kv_read(
862        &self,
863        primary_namespace: &str,
864        secondary_namespace: &str,
865        key: &str,
866    ) -> Result<Option<Vec<u8>>, Self::Err> {
867        let path = format!(
868            "rest/v1/kv_store?primary_namespace=eq.{}&secondary_namespace=eq.{}&key=eq.{}",
869            url_encode(primary_namespace),
870            url_encode(secondary_namespace),
871            url_encode(key)
872        );
873
874        let (status, text) = self.get_request(&path).await?;
875
876        if status == StatusCode::NO_CONTENT || !status.is_success() {
877            if !status.is_success() && status != StatusCode::NO_CONTENT {
878                return Err(DatabaseError::Internal(format!(
879                    "kv_read failed: HTTP {}",
880                    status
881                )));
882            }
883            return Ok(None);
884        }
885
886        if let Some(items) = Self::parse_response::<KVStoreTable>(&text)? {
887            if let Some(item) = items.into_iter().next() {
888                let bytes = hex::decode(item.value)
889                    .map_err(|_| DatabaseError::Internal("Invalid hex in kv_store".into()))?;
890
891                // Decrypt value
892                let decrypted = self.decrypt(&bytes).await?;
893                return Ok(Some(decrypted));
894            }
895        }
896        Ok(None)
897    }
898
899    async fn kv_list(
900        &self,
901        primary_namespace: &str,
902        secondary_namespace: &str,
903    ) -> Result<Vec<String>, Self::Err> {
904        let path = format!(
905            "rest/v1/kv_store?primary_namespace=eq.{}&secondary_namespace=eq.{}",
906            url_encode(primary_namespace),
907            url_encode(secondary_namespace)
908        );
909
910        let (status, text) = self.get_request(&path).await?;
911
912        if !status.is_success() {
913            return Err(DatabaseError::Internal(format!(
914                "kv_list failed: HTTP {}",
915                status
916            )));
917        }
918
919        if let Some(items) = Self::parse_response::<KVStoreTable>(&text)? {
920            Ok(items.into_iter().map(|i| i.key).collect())
921        } else {
922            Ok(Vec::new())
923        }
924    }
925}
926#[async_trait]
927impl Database<DatabaseError> for SupabaseWalletDatabase {
928    async fn get_mint(&self, mint_url: MintUrl) -> Result<Option<MintInfo>, DatabaseError> {
929        let path = format!(
930            "rest/v1/mint?mint_url=eq.{}",
931            url_encode(&mint_url.to_string())
932        );
933        let (status, text) = self.get_request(&path).await?;
934
935        // 404 or empty result means not found
936        if status == StatusCode::NOT_FOUND {
937            return Ok(None);
938        }
939        if !status.is_success() {
940            return Err(DatabaseError::Internal(format!(
941                "get_mint failed: HTTP {}",
942                status
943            )));
944        }
945
946        if let Some(mints) = Self::parse_response::<MintTable>(&text)? {
947            if let Some(mint) = mints.into_iter().next() {
948                return Ok(Some(mint.try_into()?));
949            }
950        }
951        Ok(None)
952    }
953
954    async fn get_mints(&self) -> Result<HashMap<MintUrl, Option<MintInfo>>, DatabaseError> {
955        let (status, text) = self.get_request("rest/v1/mint").await?;
956
957        if !status.is_success() {
958            return Err(DatabaseError::Internal(format!(
959                "get_mints failed: HTTP {}",
960                status
961            )));
962        }
963
964        let mut map = HashMap::new();
965        if let Some(mints) = Self::parse_response::<MintTable>(&text)? {
966            for mint in mints {
967                map.insert(
968                    MintUrl::from_str(&mint.mint_url)
969                        .map_err(|e| DatabaseError::Internal(e.to_string()))?,
970                    Some(mint.try_into()?),
971                );
972            }
973        }
974        Ok(map)
975    }
976
977    async fn get_mint_keysets(
978        &self,
979        mint_url: MintUrl,
980    ) -> Result<Option<Vec<KeySetInfo>>, DatabaseError> {
981        let path = format!(
982            "rest/v1/keyset?mint_url=eq.{}",
983            url_encode(&mint_url.to_string())
984        );
985        let (status, text) = self.get_request(&path).await?;
986
987        if !status.is_success() {
988            return Err(DatabaseError::Internal(format!(
989                "get_mint_keysets failed: HTTP {}",
990                status
991            )));
992        }
993
994        if let Some(keysets) = Self::parse_response::<KeySetTable>(&text)? {
995            let result: Result<Vec<KeySetInfo>, _> =
996                keysets.into_iter().map(|ks| ks.try_into()).collect();
997            Ok(Some(result?))
998        } else {
999            Ok(None)
1000        }
1001    }
1002
1003    async fn get_keyset_by_id(&self, keyset_id: &Id) -> Result<Option<KeySetInfo>, DatabaseError> {
1004        let path = format!(
1005            "rest/v1/keyset?id=eq.{}",
1006            url_encode(&keyset_id.to_string())
1007        );
1008        let (status, text) = self.get_request(&path).await?;
1009
1010        if !status.is_success() {
1011            return Err(DatabaseError::Internal(format!(
1012                "get_keyset_by_id failed: HTTP {}",
1013                status
1014            )));
1015        }
1016
1017        if let Some(items) = Self::parse_response::<KeySetTable>(&text)? {
1018            if let Some(item) = items.into_iter().next() {
1019                return Ok(Some(item.try_into()?));
1020            }
1021        }
1022        Ok(None)
1023    }
1024
1025    async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, DatabaseError> {
1026        let path = format!("rest/v1/mint_quote?id=eq.{}", url_encode(quote_id));
1027        let (status, text) = self.get_request(&path).await?;
1028
1029        if !status.is_success() {
1030            return Err(DatabaseError::Internal(format!(
1031                "get_mint_quote failed: HTTP {}",
1032                status
1033            )));
1034        }
1035
1036        if let Some(items) = Self::parse_response::<MintQuoteTable>(&text)? {
1037            if let Some(item) = items.into_iter().next() {
1038                return Ok(Some(item.try_into()?));
1039            }
1040        }
1041        Ok(None)
1042    }
1043
1044    async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, DatabaseError> {
1045        let (status, text) = self.get_request("rest/v1/mint_quote").await?;
1046
1047        if !status.is_success() {
1048            return Err(DatabaseError::Internal(format!(
1049                "get_mint_quotes failed: HTTP {}",
1050                status
1051            )));
1052        }
1053
1054        if let Some(quotes) = Self::parse_response::<MintQuoteTable>(&text)? {
1055            quotes.into_iter().map(|q| q.try_into()).collect()
1056        } else {
1057            Ok(Vec::new())
1058        }
1059    }
1060
1061    async fn get_unissued_mint_quotes(&self) -> Result<Vec<MintQuote>, DatabaseError> {
1062        let (status, text) = self
1063            .get_request("rest/v1/mint_quote?amount_issued=eq.0")
1064            .await?;
1065
1066        if !status.is_success() {
1067            return Err(DatabaseError::Internal(format!(
1068                "get_unissued_mint_quotes failed: HTTP {}",
1069                status
1070            )));
1071        }
1072
1073        if let Some(quotes) = Self::parse_response::<MintQuoteTable>(&text)? {
1074            quotes.into_iter().map(|q| q.try_into()).collect()
1075        } else {
1076            Ok(Vec::new())
1077        }
1078    }
1079
1080    async fn get_melt_quote(
1081        &self,
1082        quote_id: &str,
1083    ) -> Result<Option<wallet::MeltQuote>, DatabaseError> {
1084        let path = format!("rest/v1/melt_quote?id=eq.{}", url_encode(quote_id));
1085        let (status, text) = self.get_request(&path).await?;
1086
1087        if !status.is_success() {
1088            return Err(DatabaseError::Internal(format!(
1089                "get_melt_quote failed: HTTP {}",
1090                status
1091            )));
1092        }
1093
1094        if let Some(items) = Self::parse_response::<MeltQuoteTable>(&text)? {
1095            if let Some(item) = items.into_iter().next() {
1096                return Ok(Some(item.try_into()?));
1097            }
1098        }
1099        Ok(None)
1100    }
1101
1102    async fn get_melt_quotes(&self) -> Result<Vec<wallet::MeltQuote>, DatabaseError> {
1103        let (status, text) = self.get_request("rest/v1/melt_quote").await?;
1104
1105        if !status.is_success() {
1106            return Err(DatabaseError::Internal(format!(
1107                "get_melt_quotes failed: HTTP {}",
1108                status
1109            )));
1110        }
1111
1112        if let Some(quotes) = Self::parse_response::<MeltQuoteTable>(&text)? {
1113            quotes.into_iter().map(|q| q.try_into()).collect()
1114        } else {
1115            Ok(Vec::new())
1116        }
1117    }
1118
1119    async fn get_keys(&self, id: &Id) -> Result<Option<Keys>, DatabaseError> {
1120        let path = format!("rest/v1/key?id=eq.{}", url_encode(&id.to_string()));
1121        let (status, text) = self.get_request(&path).await?;
1122
1123        if !status.is_success() {
1124            return Err(DatabaseError::Internal(format!(
1125                "get_keys failed: HTTP {}",
1126                status
1127            )));
1128        }
1129
1130        if let Some(items) = Self::parse_response::<KeyTable>(&text)? {
1131            if let Some(item) = items.into_iter().next() {
1132                return Ok(Some(item.try_into()?));
1133            }
1134        }
1135        Ok(None)
1136    }
1137
1138    async fn get_proofs(
1139        &self,
1140        mint_url: Option<MintUrl>,
1141        unit: Option<CurrencyUnit>,
1142        state: Option<Vec<State>>,
1143        spending_conditions: Option<Vec<SpendingConditions>>,
1144    ) -> Result<Vec<ProofInfo>, DatabaseError> {
1145        let mut query = String::from("rest/v1/proof?select=*");
1146        if let Some(url) = mint_url {
1147            query.push_str(&format!("&mint_url=eq.{}", url_encode(&url.to_string())));
1148        }
1149        if let Some(u) = unit {
1150            query.push_str(&format!("&unit=eq.{}", url_encode(&u.to_string())));
1151        }
1152        if let Some(states) = state {
1153            let s_str: Vec<String> = states.iter().map(|s| s.to_string()).collect();
1154            query.push_str(&format!("&state=in.({})", s_str.join(",")));
1155        }
1156
1157        let (status, text) = self.get_request(&query).await?;
1158
1159        if !status.is_success() {
1160            return Err(DatabaseError::Internal(format!(
1161                "get_proofs failed: HTTP {}",
1162                status
1163            )));
1164        }
1165
1166        let mut result = Vec::new();
1167        if let Some(proofs) = Self::parse_response::<ProofTable>(&text)? {
1168            for mut p in proofs {
1169                self.decrypt_proof_table(&mut p).await;
1170
1171                result.push(p.try_into()?);
1172            }
1173        }
1174
1175        // Filter by spending conditions in memory if specified
1176        if let Some(conds) = spending_conditions {
1177            result.retain(|p: &ProofInfo| {
1178                if let Some(sc) = &p.spending_condition {
1179                    conds.contains(sc)
1180                } else {
1181                    false
1182                }
1183            });
1184        }
1185
1186        Ok(result)
1187    }
1188
1189    async fn get_proofs_by_ys(&self, ys: Vec<PublicKey>) -> Result<Vec<ProofInfo>, DatabaseError> {
1190        if ys.is_empty() {
1191            return Ok(Vec::new());
1192        }
1193
1194        let ys_str: Vec<String> = ys.iter().map(|y| hex::encode(y.to_bytes())).collect();
1195        let filter = format!("({})", ys_str.join(","));
1196        let path = format!("rest/v1/proof?y=in.{}", filter);
1197
1198        let (status, text) = self.get_request(&path).await?;
1199
1200        if !status.is_success() {
1201            return Err(DatabaseError::Internal(format!(
1202                "get_proofs_by_ys failed: HTTP {}",
1203                status
1204            )));
1205        }
1206
1207        if let Some(proofs) = Self::parse_response::<ProofTable>(&text)? {
1208            let mut result = Vec::new();
1209            for mut p in proofs {
1210                self.decrypt_proof_table(&mut p).await;
1211
1212                result.push(p.try_into()?);
1213            }
1214            Ok(result)
1215        } else {
1216            Ok(Vec::new())
1217        }
1218    }
1219
1220    async fn get_balance(
1221        &self,
1222        mint_url: Option<MintUrl>,
1223        unit: Option<CurrencyUnit>,
1224        state: Option<Vec<State>>,
1225    ) -> Result<u64, DatabaseError> {
1226        // Note: Ideally this would use a server-side SUM aggregation, but PostgREST
1227        // doesn't support aggregate functions directly. We fetch all proofs and sum locally.
1228        let proofs = self.get_proofs(mint_url, unit, state, None).await?;
1229        Ok(proofs.iter().map(|p| p.proof.amount.to_u64()).sum())
1230    }
1231
1232    async fn get_transaction(
1233        &self,
1234        transaction_id: TransactionId,
1235    ) -> Result<Option<Transaction>, DatabaseError> {
1236        let id_hex = transaction_id.to_string();
1237        let path = format!("rest/v1/transactions?id=eq.{}", url_encode(&id_hex));
1238
1239        let (status, text) = self.get_request(&path).await?;
1240
1241        // 404 or empty result means not found
1242        if status == StatusCode::NOT_FOUND {
1243            return Ok(None);
1244        }
1245        if !status.is_success() {
1246            return Err(DatabaseError::Internal(format!(
1247                "get_transaction failed: HTTP {}",
1248                status
1249            )));
1250        }
1251
1252        if let Some(txs) = Self::parse_response::<TransactionTable>(&text)? {
1253            if let Some(t) = txs.into_iter().next() {
1254                return Ok(Some(t.try_into()?));
1255            }
1256        }
1257        Ok(None)
1258    }
1259
1260    async fn list_transactions(
1261        &self,
1262        mint_url: Option<MintUrl>,
1263        direction: Option<TransactionDirection>,
1264        unit: Option<CurrencyUnit>,
1265    ) -> Result<Vec<Transaction>, DatabaseError> {
1266        let mut query = String::from("rest/v1/transactions?select=*");
1267        if let Some(url) = mint_url {
1268            query.push_str(&format!("&mint_url=eq.{}", url_encode(&url.to_string())));
1269        }
1270        if let Some(d) = direction {
1271            query.push_str(&format!("&direction=eq.{}", url_encode(&d.to_string())));
1272        }
1273        if let Some(u) = unit {
1274            query.push_str(&format!("&unit=eq.{}", url_encode(&u.to_string())));
1275        }
1276
1277        let (status, text) = self.get_request(&query).await?;
1278
1279        if !status.is_success() {
1280            return Err(DatabaseError::Internal(format!(
1281                "list_transactions failed: HTTP {}",
1282                status
1283            )));
1284        }
1285
1286        if let Some(txs) = Self::parse_response::<TransactionTable>(&text)? {
1287            txs.into_iter().map(|t| t.try_into()).collect()
1288        } else {
1289            Ok(Vec::new())
1290        }
1291    }
1292
1293    async fn update_proofs(
1294        &self,
1295        added: Vec<ProofInfo>,
1296        removed_ys: Vec<PublicKey>,
1297    ) -> Result<(), DatabaseError> {
1298        // If nothing to do, return early
1299        if added.is_empty() && removed_ys.is_empty() {
1300            return Ok(());
1301        }
1302
1303        // Convert proofs to table format for the RPC call
1304
1305        // Re-do serialization loop properly to allow await
1306        let mut proofs_json: Vec<serde_json::Value> = Vec::with_capacity(added.len());
1307        for p in added {
1308            let mut table: ProofTable = p.try_into()?;
1309
1310            // Encrypt secret
1311            let secret_bytes = table.secret.as_bytes();
1312            let encrypted = self.encrypt(secret_bytes).await?;
1313            table.secret = hex::encode(encrypted);
1314
1315            // Encrypt C
1316            if let Ok(c_bytes) = hex::decode(&table.c) {
1317                let encrypted_c = self.encrypt(&c_bytes).await?;
1318                table.c = hex::encode(encrypted_c);
1319            }
1320
1321            proofs_json.push(serde_json::to_value(&table).map_err(DatabaseError::from)?);
1322        }
1323
1324        // Convert Y values to hex strings
1325        let ys_json: Vec<String> = removed_ys
1326            .iter()
1327            .map(|y| hex::encode(y.to_bytes()))
1328            .collect();
1329
1330        // Try atomic RPC first
1331        let rpc_body = serde_json::json!({
1332            "p_proofs_to_add": proofs_json,
1333            "p_ys_to_remove": ys_json
1334        });
1335
1336        let url = self.join_url("rest/v1/rpc/update_proofs_atomic")?;
1337        let auth_bearer = self.get_auth_bearer().await;
1338
1339        tracing::debug!(
1340            method = "POST",
1341            url = %url,
1342            proofs_count = proofs_json.len(),
1343            remove_count = ys_json.len(),
1344            "Supabase atomic update_proofs RPC"
1345        );
1346
1347        let res = self
1348            .client
1349            .post(url.clone())
1350            .header("apikey", &self.api_key)
1351            .header("Authorization", format!("Bearer {}", auth_bearer))
1352            .header("Content-Type", "application/json")
1353            .json(&rpc_body)
1354            .send()
1355            .await
1356            .map_err(Error::Reqwest)?;
1357
1358        let status = res.status();
1359        let text = res.text().await.map_err(Error::Reqwest)?;
1360
1361        tracing::debug!(
1362            method = "POST",
1363            url = %url,
1364            status = %status,
1365            response_len = text.len(),
1366            "Supabase atomic update_proofs response"
1367        );
1368
1369        if status.is_success() {
1370            return Ok(());
1371        }
1372
1373        Err(DatabaseError::Internal(format!(
1374            "update_proofs_atomic RPC failed: HTTP {}. Ensure migrations have been run.",
1375            status
1376        )))
1377    }
1378
1379    async fn update_proofs_state(
1380        &self,
1381        ys: Vec<PublicKey>,
1382        state: State,
1383    ) -> Result<(), DatabaseError> {
1384        if ys.is_empty() {
1385            return Ok(());
1386        }
1387
1388        let ys_str: Vec<String> = ys.iter().map(|y| hex::encode(y.to_bytes())).collect();
1389        let filter = format!("({})", ys_str.join(","));
1390        let path = format!("rest/v1/proof?y=in.{}", filter);
1391
1392        let (status, response_text) = self
1393            .patch_request(&path, &serde_json::json!({ "state": state.to_string() }))
1394            .await?;
1395
1396        if !status.is_success() {
1397            return Err(DatabaseError::Internal(format!(
1398                "update_proofs_state failed: HTTP {} - {}",
1399                status, response_text
1400            )));
1401        }
1402
1403        Ok(())
1404    }
1405
1406    async fn add_transaction(&self, transaction: Transaction) -> Result<(), DatabaseError> {
1407        let item: TransactionTable = transaction.try_into()?;
1408        let (status, response_text) = self
1409            .post_request("rest/v1/transactions?on_conflict=id,wallet_id", &item)
1410            .await?;
1411
1412        if !status.is_success() {
1413            return Err(DatabaseError::Internal(format!(
1414                "add_transaction failed: HTTP {} - {}",
1415                status, response_text
1416            )));
1417        }
1418        Ok(())
1419    }
1420
1421    async fn remove_transaction(&self, transaction_id: TransactionId) -> Result<(), DatabaseError> {
1422        let id_hex = transaction_id.to_string();
1423        let path = format!("rest/v1/transactions?id=eq.{}", url_encode(&id_hex));
1424
1425        let (status, response_text) = self.delete_request(&path).await?;
1426
1427        if !status.is_success() {
1428            return Err(DatabaseError::Internal(format!(
1429                "remove_transaction failed: HTTP {} - {}",
1430                status, response_text
1431            )));
1432        }
1433        Ok(())
1434    }
1435
1436    async fn update_mint_url(
1437        &self,
1438        old_mint_url: MintUrl,
1439        new_mint_url: MintUrl,
1440    ) -> Result<(), DatabaseError> {
1441        let old_encoded = url_encode(&old_mint_url.to_string());
1442        let update_body = serde_json::json!({ "mint_url": new_mint_url.to_string() });
1443
1444        // Update mint table first (parent table)
1445        let path = format!("rest/v1/mint?mint_url=eq.{}", old_encoded);
1446        let (status, response_text) = self.patch_request(&path, &update_body).await?;
1447        if !status.is_success() {
1448            return Err(DatabaseError::Internal(format!(
1449                "update_mint_url (mint) failed: HTTP {} - {}",
1450                status, response_text
1451            )));
1452        }
1453
1454        // Update keyset table
1455        let path = format!("rest/v1/keyset?mint_url=eq.{}", old_encoded);
1456        let (status, response_text) = self.patch_request(&path, &update_body).await?;
1457        if !status.is_success() {
1458            return Err(DatabaseError::Internal(format!(
1459                "update_mint_url (keyset) failed: HTTP {} - {}",
1460                status, response_text
1461            )));
1462        }
1463
1464        // Update mint_quote table
1465        let path = format!("rest/v1/mint_quote?mint_url=eq.{}", old_encoded);
1466        let (status, response_text) = self.patch_request(&path, &update_body).await?;
1467        if !status.is_success() {
1468            return Err(DatabaseError::Internal(format!(
1469                "update_mint_url (mint_quote) failed: HTTP {} - {}",
1470                status, response_text
1471            )));
1472        }
1473
1474        // Update proof table
1475        let path = format!("rest/v1/proof?mint_url=eq.{}", old_encoded);
1476        let (status, response_text) = self.patch_request(&path, &update_body).await?;
1477        if !status.is_success() {
1478            return Err(DatabaseError::Internal(format!(
1479                "update_mint_url (proof) failed: HTTP {} - {}",
1480                status, response_text
1481            )));
1482        }
1483
1484        // Update transactions table
1485        let path = format!("rest/v1/transactions?mint_url=eq.{}", old_encoded);
1486        let (status, response_text) = self.patch_request(&path, &update_body).await?;
1487        if !status.is_success() {
1488            return Err(DatabaseError::Internal(format!(
1489                "update_mint_url (transactions) failed: HTTP {} - {}",
1490                status, response_text
1491            )));
1492        }
1493
1494        Ok(())
1495    }
1496
1497    async fn increment_keyset_counter(
1498        &self,
1499        keyset_id: &Id,
1500        count: u32,
1501    ) -> Result<u32, DatabaseError> {
1502        // Use Supabase RPC for atomic increment
1503        // This calls the increment_keyset_counter PostgreSQL function
1504        let rpc_body = serde_json::json!({
1505            "p_keyset_id": keyset_id.to_string(),
1506            "p_increment": count as i32
1507        });
1508
1509        let url = self.join_url("rest/v1/rpc/increment_keyset_counter")?;
1510        let auth_bearer = self.get_auth_bearer().await;
1511
1512        tracing::debug!(method = "POST", url = %url, keyset_id = %keyset_id, increment = count, "Supabase RPC request");
1513
1514        let res = self
1515            .client
1516            .post(url.clone())
1517            .header("apikey", &self.api_key)
1518            .header("Authorization", format!("Bearer {}", auth_bearer))
1519            .header("Content-Type", "application/json")
1520            .header("Prefer", "return=representation")
1521            .json(&rpc_body)
1522            .send()
1523            .await
1524            .map_err(Error::Reqwest)?;
1525
1526        let status = res.status();
1527        let text = res.text().await.map_err(Error::Reqwest)?;
1528
1529        tracing::debug!(method = "POST", url = %url, status = %status, response_len = text.len(), "Supabase RPC response");
1530
1531        if status.is_success() {
1532            // RPC returns the new counter value directly
1533            let new_counter: i32 = serde_json::from_str(&text).map_err(|e| {
1534                DatabaseError::Internal(format!("Failed to parse counter response: {}", e))
1535            })?;
1536            return Ok(new_counter as u32);
1537        }
1538
1539        Err(DatabaseError::Internal(format!(
1540            "increment_keyset_counter RPC failed: HTTP {}. Ensure migrations have been run.",
1541            status
1542        )))
1543    }
1544
1545    async fn add_mint(
1546        &self,
1547        mint_url: MintUrl,
1548        mint_info: Option<MintInfo>,
1549    ) -> Result<(), DatabaseError> {
1550        let info_table: MintTable = match mint_info {
1551            Some(info) => MintTable::from_info(mint_url.clone(), info)?,
1552            None => MintTable {
1553                mint_url: mint_url.to_string(),
1554                ..Default::default()
1555            },
1556        };
1557
1558        let (status, response_text) = self
1559            .post_request("rest/v1/mint?on_conflict=mint_url,wallet_id", &info_table)
1560            .await?;
1561
1562        if !status.is_success() {
1563            return Err(DatabaseError::Internal(format!(
1564                "add_mint failed: HTTP {} - {}",
1565                status, response_text
1566            )));
1567        }
1568        Ok(())
1569    }
1570
1571    async fn remove_mint(&self, mint_url: MintUrl) -> Result<(), DatabaseError> {
1572        let path = format!(
1573            "rest/v1/mint?mint_url=eq.{}",
1574            url_encode(&mint_url.to_string())
1575        );
1576        let (status, response_text) = self.delete_request(&path).await?;
1577
1578        if !status.is_success() {
1579            return Err(DatabaseError::Internal(format!(
1580                "remove_mint failed: HTTP {} - {}",
1581                status, response_text
1582            )));
1583        }
1584        Ok(())
1585    }
1586
1587    async fn add_mint_keysets(
1588        &self,
1589        mint_url: MintUrl,
1590        keysets: Vec<KeySetInfo>,
1591    ) -> Result<(), DatabaseError> {
1592        if keysets.is_empty() {
1593            return Ok(());
1594        }
1595
1596        let items: Result<Vec<KeySetTable>, DatabaseError> = keysets
1597            .into_iter()
1598            .map(|k| KeySetTable::from_info(mint_url.clone(), k))
1599            .collect();
1600        let items = items?;
1601
1602        let (status, response_text) = self
1603            .post_request("rest/v1/keyset?on_conflict=id,wallet_id", &items)
1604            .await?;
1605
1606        if !status.is_success() {
1607            return Err(DatabaseError::Internal(format!(
1608                "add_mint_keysets failed: HTTP {} - {}",
1609                status, response_text
1610            )));
1611        }
1612        Ok(())
1613    }
1614
1615    async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), DatabaseError> {
1616        let expected_version = quote.version;
1617        let mut item: MintQuoteTable = quote.try_into()?;
1618
1619        // Try UPDATE first: only matches a row whose stored version equals the
1620        // expected version, bumping it to `expected_version + 1`.
1621        item.version = Some(expected_version.wrapping_add(1) as i32);
1622
1623        let path = format!(
1624            "rest/v1/mint_quote?id=eq.{}&version=eq.{}",
1625            url_encode(&item.id),
1626            expected_version
1627        );
1628
1629        // Use `return=representation` so PostgREST returns the updated rows as JSON.
1630        // An empty array `[]` means the version filter matched nothing — the row either
1631        // doesn't exist yet or was concurrently modified.
1632        let (status, response_text) = self.patch_request_returning(&path, &item).await?;
1633
1634        if !status.is_success() {
1635            return Err(DatabaseError::Internal(format!(
1636                "add_mint_quote failed: HTTP {} - {}",
1637                status, response_text
1638            )));
1639        }
1640
1641        let updated: serde_json::Value =
1642            serde_json::from_str(&response_text).unwrap_or(serde_json::Value::Null);
1643        let row_count = updated.as_array().map(|a| a.len()).unwrap_or(0);
1644
1645        if row_count > 0 {
1646            // PATCH updated an existing row — done.
1647            return Ok(());
1648        }
1649
1650        // No rows updated: either the row doesn't exist yet (INSERT), or it exists
1651        // with a different version (concurrent update). Attempt a plain INSERT that
1652        // stores the quote's own version and fails with 409 on a primary-key conflict.
1653        item.version = Some(expected_version as i32);
1654        let (status, response_text) = self.insert_request("rest/v1/mint_quote", &item).await?;
1655
1656        if status.is_success() {
1657            return Ok(());
1658        }
1659
1660        if status == StatusCode::CONFLICT {
1661            // Row already exists but the version filter above did not match it.
1662            return Err(DatabaseError::ConcurrentUpdate);
1663        }
1664
1665        Err(DatabaseError::Internal(format!(
1666            "add_mint_quote insert failed: HTTP {} - {}",
1667            status, response_text
1668        )))
1669    }
1670
1671    async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), DatabaseError> {
1672        let path = format!("rest/v1/mint_quote?id=eq.{}", url_encode(quote_id));
1673        let (status, response_text) = self.delete_request(&path).await?;
1674
1675        if !status.is_success() {
1676            return Err(DatabaseError::Internal(format!(
1677                "remove_mint_quote failed: HTTP {} - {}",
1678                status, response_text
1679            )));
1680        }
1681        Ok(())
1682    }
1683
1684    async fn add_melt_quote(&self, quote: wallet::MeltQuote) -> Result<(), DatabaseError> {
1685        let expected_version = quote.version;
1686        let mut item: MeltQuoteTable = quote.try_into()?;
1687
1688        // Try UPDATE first: only matches a row whose stored version equals the
1689        // expected version, bumping it to `expected_version + 1`.
1690        item.version = Some(expected_version.wrapping_add(1) as i32);
1691
1692        let path = format!(
1693            "rest/v1/melt_quote?id=eq.{}&version=eq.{}",
1694            url_encode(&item.id),
1695            expected_version
1696        );
1697
1698        // Use `return=representation` so PostgREST returns the updated rows as JSON.
1699        // An empty array `[]` means the version filter matched nothing — the row either
1700        // doesn't exist yet or was concurrently modified.
1701        let (status, response_text) = self.patch_request_returning(&path, &item).await?;
1702
1703        if !status.is_success() {
1704            return Err(DatabaseError::Internal(format!(
1705                "add_melt_quote failed: HTTP {} - {}",
1706                status, response_text
1707            )));
1708        }
1709
1710        let updated: serde_json::Value =
1711            serde_json::from_str(&response_text).unwrap_or(serde_json::Value::Null);
1712        let row_count = updated.as_array().map(|a| a.len()).unwrap_or(0);
1713
1714        if row_count > 0 {
1715            // PATCH updated an existing row — done.
1716            return Ok(());
1717        }
1718
1719        // No rows updated: either the row doesn't exist yet (INSERT), or it exists
1720        // with a different version (concurrent update). Attempt a plain INSERT that
1721        // stores the quote's own version and fails with 409 on a primary-key conflict.
1722        item.version = Some(expected_version as i32);
1723        let (status, response_text) = self.insert_request("rest/v1/melt_quote", &item).await?;
1724
1725        if status.is_success() {
1726            return Ok(());
1727        }
1728
1729        if status == StatusCode::CONFLICT {
1730            // Row already exists but the version filter above did not match it.
1731            return Err(DatabaseError::ConcurrentUpdate);
1732        }
1733
1734        Err(DatabaseError::Internal(format!(
1735            "add_melt_quote insert failed: HTTP {} - {}",
1736            status, response_text
1737        )))
1738    }
1739
1740    async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), DatabaseError> {
1741        let path = format!("rest/v1/melt_quote?id=eq.{}", url_encode(quote_id));
1742        let (status, response_text) = self.delete_request(&path).await?;
1743
1744        if !status.is_success() {
1745            return Err(DatabaseError::Internal(format!(
1746                "remove_melt_quote failed: HTTP {} - {}",
1747                status, response_text
1748            )));
1749        }
1750        Ok(())
1751    }
1752
1753    async fn add_keys(&self, keyset: KeySet) -> Result<(), DatabaseError> {
1754        keyset.verify_id().map_err(DatabaseError::from)?;
1755        let item = KeyTable::from_keyset(&keyset)?;
1756
1757        let (status, response_text) = self
1758            .post_request("rest/v1/key?on_conflict=id,wallet_id", &item)
1759            .await?;
1760
1761        if !status.is_success() {
1762            return Err(DatabaseError::Internal(format!(
1763                "add_keys failed: HTTP {} - {}",
1764                status, response_text
1765            )));
1766        }
1767        Ok(())
1768    }
1769
1770    async fn remove_keys(&self, id: &Id) -> Result<(), DatabaseError> {
1771        let path = format!("rest/v1/key?id=eq.{}", url_encode(&id.to_string()));
1772        let (status, response_text) = self.delete_request(&path).await?;
1773
1774        if !status.is_success() {
1775            return Err(DatabaseError::Internal(format!(
1776                "remove_keys failed: HTTP {} - {}",
1777                status, response_text
1778            )));
1779        }
1780        Ok(())
1781    }
1782
1783    async fn kv_write(
1784        &self,
1785        primary_namespace: &str,
1786        secondary_namespace: &str,
1787        key: &str,
1788        value: &[u8],
1789    ) -> Result<(), DatabaseError> {
1790        // Encrypt value
1791        let encrypted = self.encrypt(value).await?;
1792
1793        let item = KVStoreTable {
1794            primary_namespace: primary_namespace.to_string(),
1795            secondary_namespace: secondary_namespace.to_string(),
1796            key: key.to_string(),
1797            value: hex::encode(encrypted),
1798            _extra: Default::default(),
1799        };
1800
1801        let (status, response_text) = self
1802            .post_request(
1803                "rest/v1/kv_store?on_conflict=primary_namespace,secondary_namespace,key,wallet_id",
1804                &item,
1805            )
1806            .await?;
1807
1808        if !status.is_success() {
1809            return Err(DatabaseError::Internal(format!(
1810                "kv_write failed: HTTP {} - {}",
1811                status, response_text
1812            )));
1813        }
1814        Ok(())
1815    }
1816
1817    async fn kv_remove(
1818        &self,
1819        primary_namespace: &str,
1820        secondary_namespace: &str,
1821        key: &str,
1822    ) -> Result<(), DatabaseError> {
1823        let path = format!(
1824            "rest/v1/kv_store?primary_namespace=eq.{}&secondary_namespace=eq.{}&key=eq.{}",
1825            url_encode(primary_namespace),
1826            url_encode(secondary_namespace),
1827            url_encode(key)
1828        );
1829        let (status, response_text) = self.delete_request(&path).await?;
1830
1831        if !status.is_success() {
1832            return Err(DatabaseError::Internal(format!(
1833                "kv_remove failed: HTTP {} - {}",
1834                status, response_text
1835            )));
1836        }
1837        Ok(())
1838    }
1839
1840    async fn kv_read(
1841        &self,
1842        primary_namespace: &str,
1843        secondary_namespace: &str,
1844        key: &str,
1845    ) -> Result<Option<Vec<u8>>, DatabaseError> {
1846        // Delegate to the KVStoreDatabase impl
1847        KVStoreDatabase::kv_read(self, primary_namespace, secondary_namespace, key).await
1848    }
1849
1850    async fn kv_list(
1851        &self,
1852        primary_namespace: &str,
1853        secondary_namespace: &str,
1854    ) -> Result<Vec<String>, DatabaseError> {
1855        // Delegate to the KVStoreDatabase impl
1856        KVStoreDatabase::kv_list(self, primary_namespace, secondary_namespace).await
1857    }
1858
1859    // ========== Saga methods ==========
1860
1861    async fn add_saga(&self, saga: WalletSaga) -> Result<(), DatabaseError> {
1862        let saga_json = serde_json::to_string(&saga)
1863            .map_err(|e| DatabaseError::Internal(format!("Serialize saga: {e}")))?;
1864
1865        let item = SagaTable {
1866            id: saga.id.to_string(),
1867            data: saga_json,
1868            version: saga.version as i32,
1869            completed: false,
1870            created_at: saga.created_at as i64,
1871            updated_at: saga.updated_at as i64,
1872            _extra: Default::default(),
1873        };
1874
1875        let (status, response_text) = self
1876            .post_request("rest/v1/saga?on_conflict=id,wallet_id", &item)
1877            .await?;
1878
1879        if !status.is_success() {
1880            return Err(DatabaseError::Internal(format!(
1881                "add_saga failed: HTTP {} - {}",
1882                status, response_text
1883            )));
1884        }
1885        Ok(())
1886    }
1887
1888    async fn get_saga(&self, id: &uuid::Uuid) -> Result<Option<WalletSaga>, DatabaseError> {
1889        let path = format!("rest/v1/saga?id=eq.{}", url_encode(&id.to_string()));
1890        let (status, text) = self.get_request(&path).await?;
1891
1892        if !status.is_success() {
1893            return Err(DatabaseError::Internal(format!(
1894                "get_saga failed: HTTP {}",
1895                status
1896            )));
1897        }
1898
1899        if let Some(items) = Self::parse_response::<SagaTable>(&text)? {
1900            if let Some(item) = items.into_iter().next() {
1901                let saga: WalletSaga = serde_json::from_str(&item.data)
1902                    .map_err(|e| DatabaseError::Internal(format!("Deserialize saga: {e}")))?;
1903                return Ok(Some(saga));
1904            }
1905        }
1906        Ok(None)
1907    }
1908
1909    async fn update_saga(&self, saga: WalletSaga) -> Result<bool, DatabaseError> {
1910        let expected_version = saga.version.saturating_sub(1);
1911        let saga_json = serde_json::to_string(&saga)
1912            .map_err(|e| DatabaseError::Internal(format!("Serialize saga: {e}")))?;
1913
1914        let item = SagaTable {
1915            id: saga.id.to_string(),
1916            data: saga_json,
1917            version: saga.version as i32,
1918            completed: false,
1919            created_at: saga.created_at as i64,
1920            updated_at: saga.updated_at as i64,
1921            _extra: Default::default(),
1922        };
1923
1924        // Use PostgREST filtering to only update if version matches (optimistic locking)
1925        let path = format!(
1926            "rest/v1/saga?id=eq.{}&version=eq.{}",
1927            url_encode(&saga.id.to_string()),
1928            expected_version
1929        );
1930
1931        // Use `return=representation` so PostgREST reports which rows were actually
1932        // updated. An empty array means the version filter matched no row (stale
1933        // update / concurrent modification), so the update did not succeed.
1934        let (status, response_text) = self.patch_request_returning(&path, &item).await?;
1935
1936        if !status.is_success() {
1937            return Err(DatabaseError::Internal(format!(
1938                "update_saga failed: HTTP {} - {}",
1939                status, response_text
1940            )));
1941        }
1942
1943        let updated: serde_json::Value =
1944            serde_json::from_str(&response_text).unwrap_or(serde_json::Value::Null);
1945        let row_count = updated.as_array().map(|a| a.len()).unwrap_or(0);
1946
1947        Ok(row_count > 0)
1948    }
1949
1950    async fn delete_saga(&self, id: &uuid::Uuid) -> Result<(), DatabaseError> {
1951        let path = format!("rest/v1/saga?id=eq.{}", url_encode(&id.to_string()));
1952        let (status, response_text) = self.delete_request(&path).await?;
1953
1954        if !status.is_success() {
1955            return Err(DatabaseError::Internal(format!(
1956                "delete_saga failed: HTTP {} - {}",
1957                status, response_text
1958            )));
1959        }
1960        Ok(())
1961    }
1962
1963    async fn get_incomplete_sagas(&self) -> Result<Vec<WalletSaga>, DatabaseError> {
1964        let path = "rest/v1/saga?completed=eq.false&order=created_at.asc";
1965        let (status, text) = self.get_request(path).await?;
1966
1967        if !status.is_success() {
1968            return Err(DatabaseError::Internal(format!(
1969                "get_incomplete_sagas failed: HTTP {}",
1970                status
1971            )));
1972        }
1973
1974        if let Some(items) = Self::parse_response::<SagaTable>(&text)? {
1975            let mut sagas = Vec::new();
1976            for item in items {
1977                let saga: WalletSaga = serde_json::from_str(&item.data)
1978                    .map_err(|e| DatabaseError::Internal(format!("Deserialize saga: {e}")))?;
1979                sagas.push(saga);
1980            }
1981            Ok(sagas)
1982        } else {
1983            Ok(Vec::new())
1984        }
1985    }
1986
1987    // ========== Proof reservation methods ==========
1988
1989    async fn reserve_proofs(
1990        &self,
1991        ys: Vec<PublicKey>,
1992        operation_id: &uuid::Uuid,
1993    ) -> Result<(), DatabaseError> {
1994        let op_id_str = operation_id.to_string();
1995        for y in &ys {
1996            let y_hex = hex::encode(y.to_bytes());
1997
1998            // Update proof state to Reserved with operation_id atomically by filtering on state=Unspent
1999            let update = serde_json::json!({
2000                "state": State::Reserved.to_string(),
2001                "used_by_operation": op_id_str,
2002            });
2003
2004            // We filter on state=Unspent to ensure we only reserve proofs that are currently available.
2005            // This prevents race conditions where two operations try to reserve the same proof.
2006            let patch_path = format!(
2007                "rest/v1/proof?y=eq.{}&state=eq.{}",
2008                url_encode(&y_hex),
2009                url_encode(&State::Unspent.to_string())
2010            );
2011
2012            let (status, response_text) = self.patch_request(&patch_path, &update).await?;
2013
2014            if !status.is_success() {
2015                return Err(DatabaseError::Internal(format!(
2016                    "reserve_proofs: update failed: HTTP {} - {}",
2017                    status, response_text
2018                )));
2019            }
2020
2021            // PostgREST returns 204 No Content for success.
2022            // If the proof was already reserved or spent, the PATCH will succeed (HTTP 204)
2023            // but no rows will be updated. We check if the proof is actually reserved.
2024            let reserved_proofs = self.get_reserved_proofs(operation_id).await?;
2025            if !reserved_proofs.iter().any(|p| p.y == *y) {
2026                return Err(DatabaseError::ProofNotUnspent);
2027            }
2028        }
2029        Ok(())
2030    }
2031
2032    async fn release_proofs(&self, operation_id: &uuid::Uuid) -> Result<(), DatabaseError> {
2033        let op_id_str = operation_id.to_string();
2034
2035        // Update all proofs reserved by this operation back to Unspent
2036        let update = serde_json::json!({
2037            "state": State::Unspent.to_string(),
2038            "used_by_operation": null,
2039        });
2040        let path = format!(
2041            "rest/v1/proof?used_by_operation=eq.{}",
2042            url_encode(&op_id_str)
2043        );
2044        let (status, response_text) = self.patch_request(&path, &update).await?;
2045
2046        if !status.is_success() {
2047            return Err(DatabaseError::Internal(format!(
2048                "release_proofs failed: HTTP {} - {}",
2049                status, response_text
2050            )));
2051        }
2052        Ok(())
2053    }
2054
2055    async fn get_reserved_proofs(
2056        &self,
2057        operation_id: &uuid::Uuid,
2058    ) -> Result<Vec<ProofInfo>, DatabaseError> {
2059        let op_id_str = operation_id.to_string();
2060        let path = format!(
2061            "rest/v1/proof?used_by_operation=eq.{}",
2062            url_encode(&op_id_str)
2063        );
2064        let (status, text) = self.get_request(&path).await?;
2065
2066        if !status.is_success() {
2067            return Err(DatabaseError::Internal(format!(
2068                "get_reserved_proofs failed: HTTP {}",
2069                status
2070            )));
2071        }
2072
2073        if let Some(items) = Self::parse_response::<ProofTable>(&text)? {
2074            let mut proofs = Vec::with_capacity(items.len());
2075            for mut p in items {
2076                self.decrypt_proof_table(&mut p).await;
2077                proofs.push(p.try_into()?);
2078            }
2079            Ok(proofs)
2080        } else {
2081            Ok(Vec::new())
2082        }
2083    }
2084
2085    // ========== Quote reservation methods ==========
2086
2087    async fn reserve_melt_quote(
2088        &self,
2089        quote_id: &str,
2090        operation_id: &uuid::Uuid,
2091    ) -> Result<(), DatabaseError> {
2092        let op_id_str = operation_id.to_string();
2093
2094        let update = serde_json::json!({
2095            "used_by_operation": op_id_str,
2096        });
2097
2098        // Use PostgREST filters on PATCH for atomic reservation.
2099        // We filter for both the quote ID and ensuring it is currently not reserved (used_by_operation IS NULL).
2100        let patch_path = format!(
2101            "rest/v1/melt_quote?id=eq.{}&used_by_operation=is.null",
2102            url_encode(quote_id)
2103        );
2104
2105        let (status, response_text) = self.patch_request(&patch_path, &update).await?;
2106
2107        if !status.is_success() {
2108            return Err(DatabaseError::Internal(format!(
2109                "reserve_melt_quote failed: HTTP {} - {}",
2110                status, response_text
2111            )));
2112        }
2113
2114        // Verify that the quote was actually updated by checking if it's reserved for this operation.
2115        let quote = self.get_melt_quote(quote_id).await?;
2116        match quote {
2117            Some(q) => {
2118                if q.used_by_operation.as_deref() == Some(&op_id_str) {
2119                    Ok(())
2120                } else {
2121                    // Quote exists but was not reserved for us (already reserved by another operation).
2122                    Err(DatabaseError::QuoteAlreadyInUse)
2123                }
2124            }
2125            None => Err(DatabaseError::UnknownQuote),
2126        }
2127    }
2128
2129    async fn release_melt_quote(&self, operation_id: &uuid::Uuid) -> Result<(), DatabaseError> {
2130        let op_id_str = operation_id.to_string();
2131
2132        let update = serde_json::json!({
2133            "used_by_operation": null,
2134        });
2135        let path = format!(
2136            "rest/v1/melt_quote?used_by_operation=eq.{}",
2137            url_encode(&op_id_str)
2138        );
2139        let (status, response_text) = self.patch_request(&path, &update).await?;
2140
2141        if !status.is_success() {
2142            return Err(DatabaseError::Internal(format!(
2143                "release_melt_quote failed: HTTP {} - {}",
2144                status, response_text
2145            )));
2146        }
2147        Ok(())
2148    }
2149
2150    async fn reserve_mint_quote(
2151        &self,
2152        quote_id: &str,
2153        operation_id: &uuid::Uuid,
2154    ) -> Result<(), DatabaseError> {
2155        let op_id_str = operation_id.to_string();
2156
2157        let update = serde_json::json!({
2158            "used_by_operation": op_id_str,
2159        });
2160
2161        // Use PostgREST filters on PATCH for atomic reservation.
2162        // We filter for both the quote ID and ensuring it is currently not reserved (used_by_operation IS NULL).
2163        let patch_path = format!(
2164            "rest/v1/mint_quote?id=eq.{}&used_by_operation=is.null",
2165            url_encode(quote_id)
2166        );
2167
2168        let (status, response_text) = self.patch_request(&patch_path, &update).await?;
2169
2170        if !status.is_success() {
2171            return Err(DatabaseError::Internal(format!(
2172                "reserve_mint_quote failed: HTTP {} - {}",
2173                status, response_text
2174            )));
2175        }
2176
2177        // Verify that the quote was actually updated by checking if it's reserved for this operation.
2178        let quote = self.get_mint_quote(quote_id).await?;
2179        match quote {
2180            Some(q) => {
2181                if q.used_by_operation.as_deref() == Some(&op_id_str) {
2182                    Ok(())
2183                } else {
2184                    // Quote exists but was not reserved for us (already reserved by another operation).
2185                    Err(DatabaseError::QuoteAlreadyInUse)
2186                }
2187            }
2188            None => Err(DatabaseError::UnknownQuote),
2189        }
2190    }
2191
2192    async fn release_mint_quote(&self, operation_id: &uuid::Uuid) -> Result<(), DatabaseError> {
2193        let op_id_str = operation_id.to_string();
2194
2195        let update = serde_json::json!({
2196            "used_by_operation": null,
2197        });
2198        let path = format!(
2199            "rest/v1/mint_quote?used_by_operation=eq.{}",
2200            url_encode(&op_id_str)
2201        );
2202        let (status, response_text) = self.patch_request(&path, &update).await?;
2203
2204        if !status.is_success() {
2205            return Err(DatabaseError::Internal(format!(
2206                "release_mint_quote failed: HTTP {} - {}",
2207                status, response_text
2208            )));
2209        }
2210        Ok(())
2211    }
2212
2213    async fn add_p2pk_key(
2214        &self,
2215        pubkey: &PublicKey,
2216        derivation_path: DerivationPath,
2217        derivation_index: u32,
2218    ) -> Result<(), DatabaseError> {
2219        let created_time = SystemTime::now()
2220            .duration_since(UNIX_EPOCH)
2221            .map_err(|e| DatabaseError::Internal(format!("SystemTime error: {}", e)))?
2222            .as_secs();
2223
2224        let item = P2PKSigningKeyTable {
2225            pubkey: hex::encode(pubkey.to_bytes()),
2226            derivation_index: derivation_index as i64,
2227            derivation_path: derivation_path.to_string(),
2228            created_time: created_time as i64,
2229            _extra: Default::default(),
2230        };
2231
2232        let (status, response_text) = self
2233            .post_request(
2234                "rest/v1/p2pk_signing_key?on_conflict=pubkey,wallet_id",
2235                &item,
2236            )
2237            .await?;
2238
2239        if !status.is_success() {
2240            return Err(DatabaseError::Internal(format!(
2241                "add_p2pk_key failed: HTTP {} - {}",
2242                status, response_text
2243            )));
2244        }
2245        Ok(())
2246    }
2247
2248    async fn get_p2pk_key(
2249        &self,
2250        pubkey: &PublicKey,
2251    ) -> Result<Option<wallet::P2PKSigningKey>, DatabaseError> {
2252        let path = format!(
2253            "rest/v1/p2pk_signing_key?pubkey=eq.{}",
2254            url_encode(&hex::encode(pubkey.to_bytes()))
2255        );
2256        let (status, text) = self.get_request(&path).await?;
2257
2258        if status == StatusCode::NOT_FOUND {
2259            return Ok(None);
2260        }
2261        if !status.is_success() {
2262            return Err(DatabaseError::Internal(format!(
2263                "get_p2pk_key failed: HTTP {}",
2264                status
2265            )));
2266        }
2267
2268        if let Some(rows) = Self::parse_response::<P2PKSigningKeyTable>(&text)? {
2269            if let Some(row) = rows.into_iter().next() {
2270                return Ok(Some(row.try_into()?));
2271            }
2272        }
2273        Ok(None)
2274    }
2275
2276    async fn list_p2pk_keys(&self) -> Result<Vec<wallet::P2PKSigningKey>, DatabaseError> {
2277        let path = "rest/v1/p2pk_signing_key?order=derivation_index.desc";
2278        let (status, text) = self.get_request(path).await?;
2279
2280        if !status.is_success() {
2281            return Err(DatabaseError::Internal(format!(
2282                "list_p2pk_keys failed: HTTP {}",
2283                status
2284            )));
2285        }
2286
2287        if let Some(rows) = Self::parse_response::<P2PKSigningKeyTable>(&text)? {
2288            rows.into_iter()
2289                .map(|row| row.try_into())
2290                .collect::<Result<Vec<_>, _>>()
2291        } else {
2292            Ok(Vec::new())
2293        }
2294    }
2295
2296    async fn latest_p2pk(&self) -> Result<Option<wallet::P2PKSigningKey>, DatabaseError> {
2297        let path = "rest/v1/p2pk_signing_key?order=derivation_index.desc&limit=1";
2298        let (status, text) = self.get_request(path).await?;
2299
2300        if !status.is_success() {
2301            return Err(DatabaseError::Internal(format!(
2302                "latest_p2pk failed: HTTP {}",
2303                status
2304            )));
2305        }
2306
2307        if let Some(rows) = Self::parse_response::<P2PKSigningKeyTable>(&text)? {
2308            if let Some(row) = rows.into_iter().next() {
2309                return Ok(Some(row.try_into()?));
2310            }
2311        }
2312        Ok(None)
2313    }
2314}
2315
2316// Data Structures for Supabase Tables (Serde)
2317
2318// Note: All table structs use `deny_unknown_fields = false` (serde default) to allow
2319// extra columns added by other applications (e.g., user_id, opt_version) without breaking.
2320
2321#[derive(Debug, Serialize, Deserialize)]
2322struct EncryptionMetadataTable {
2323    version: i64,
2324    kdf: String,
2325    salt: String,
2326    scrypt_log_n: i64,
2327    scrypt_r: i64,
2328    scrypt_p: i64,
2329    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2330    #[serde(default, skip_serializing, flatten)]
2331    _extra: serde_json::Map<String, serde_json::Value>,
2332}
2333
2334impl EncryptionMetadataTable {
2335    fn new() -> Self {
2336        let mut salt = [0u8; ENCRYPTION_SALT_BYTES];
2337        OsRng.fill_bytes(&mut salt);
2338
2339        Self {
2340            version: ENCRYPTION_METADATA_VERSION,
2341            kdf: ENCRYPTION_KDF.to_string(),
2342            salt: hex::encode(salt),
2343            scrypt_log_n: SCRYPT_LOG_N as i64,
2344            scrypt_r: SCRYPT_R as i64,
2345            scrypt_p: SCRYPT_P as i64,
2346            _extra: Default::default(),
2347        }
2348    }
2349}
2350
2351#[derive(Debug, Serialize, Deserialize)]
2352struct KVStoreTable {
2353    primary_namespace: String,
2354    secondary_namespace: String,
2355    key: String,
2356    value: String, // hex encoded bytea
2357    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2358    #[serde(default, skip_serializing, flatten)]
2359    _extra: serde_json::Map<String, serde_json::Value>,
2360}
2361
2362#[derive(Debug, Serialize, Deserialize, Default)]
2363struct MintTable {
2364    mint_url: String,
2365    name: Option<String>,
2366    pubkey: Option<String>,
2367    version: Option<String>,
2368    description: Option<String>,
2369    description_long: Option<String>,
2370    contact: Option<String>,
2371    nuts: Option<String>,
2372    icon_url: Option<String>,
2373    urls: Option<String>,
2374    motd: Option<String>,
2375    mint_time: Option<i64>,
2376    tos_url: Option<String>,
2377    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2378    #[serde(default, skip_serializing, flatten)]
2379    _extra: serde_json::Map<String, serde_json::Value>,
2380}
2381
2382impl MintTable {
2383    fn from_info(mint_url: MintUrl, info: MintInfo) -> Result<Self, DatabaseError> {
2384        Ok(Self {
2385            mint_url: mint_url.to_string(),
2386            name: info.name,
2387            pubkey: info.pubkey.map(|p| hex::encode(p.to_bytes())),
2388            version: info
2389                .version
2390                .map(|v| serde_json::to_string(&v))
2391                .transpose()?,
2392            description: info.description,
2393            description_long: info.description_long,
2394            contact: info
2395                .contact
2396                .map(|c| serde_json::to_string(&c))
2397                .transpose()?,
2398            nuts: Some(serde_json::to_string(&info.nuts)?),
2399            icon_url: info.icon_url,
2400            urls: info.urls.map(|u| serde_json::to_string(&u)).transpose()?,
2401            motd: info.motd,
2402            mint_time: info.time.map(|t| t as i64),
2403            tos_url: info.tos_url,
2404            _extra: Default::default(),
2405        })
2406    }
2407}
2408
2409impl TryInto<MintInfo> for MintTable {
2410    type Error = DatabaseError;
2411    fn try_into(self) -> Result<MintInfo, Self::Error> {
2412        // Helper to filter empty strings before JSON parsing
2413        fn parse_json_field<T: serde::de::DeserializeOwned>(
2414            field: Option<String>,
2415        ) -> Result<Option<T>, serde_json::Error> {
2416            match field {
2417                Some(s) if !s.trim().is_empty() => {
2418                    let s = s.trim();
2419                    match serde_json::from_str::<T>(s) {
2420                        Ok(v) => Ok(Some(v)),
2421                        Err(e) => {
2422                            // If it fails to parse, try wrapping it in quotes in case it's a bare string
2423                            // but only if it doesn't already look like a JSON object or array
2424                            if !s.starts_with('{') && !s.starts_with('[') && !s.starts_with('"') {
2425                                let quoted = format!("\"{}\"", s);
2426                                if let Ok(v) = serde_json::from_str::<T>(&quoted) {
2427                                    return Ok(Some(v));
2428                                }
2429                            }
2430                            Err(e)
2431                        }
2432                    }
2433                }
2434                _ => Ok(None),
2435            }
2436        }
2437
2438        Ok(MintInfo {
2439            name: self.name,
2440            pubkey: self
2441                .pubkey
2442                .map(|p| {
2443                    PublicKey::from_hex(&p)
2444                        .map_err(|_| DatabaseError::Internal("Invalid pubkey hex".into()))
2445                })
2446                .transpose()?,
2447            version: parse_json_field(self.version)?,
2448            description: self.description,
2449            description_long: self.description_long,
2450            contact: parse_json_field(self.contact)?,
2451            nuts: parse_json_field(self.nuts)?.unwrap_or_default(),
2452            icon_url: self.icon_url,
2453            urls: parse_json_field(self.urls)?,
2454            motd: self.motd,
2455            time: self.mint_time.map(|t| t as u64),
2456            tos_url: self.tos_url,
2457        })
2458    }
2459}
2460
2461#[derive(Debug, Serialize, Deserialize)]
2462struct KeySetTable {
2463    mint_url: String,
2464    id: String,
2465    unit: String,
2466    active: bool,
2467    input_fee_ppk: i64,
2468    final_expiry: Option<i64>,
2469    keyset_u32: Option<i64>,
2470    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2471    #[serde(default, skip_serializing, flatten)]
2472    _extra: serde_json::Map<String, serde_json::Value>,
2473}
2474
2475impl KeySetTable {
2476    fn from_info(mint_url: MintUrl, info: KeySetInfo) -> Result<Self, DatabaseError> {
2477        Ok(Self {
2478            mint_url: mint_url.to_string(),
2479            id: info.id.to_string(),
2480            unit: info.unit.to_string(),
2481            active: info.active,
2482            input_fee_ppk: info.input_fee_ppk as i64,
2483            final_expiry: info.final_expiry.map(|v| v as i64),
2484            keyset_u32: Some(u32::from(info.id) as i64),
2485            _extra: Default::default(),
2486        })
2487    }
2488}
2489
2490impl TryInto<KeySetInfo> for KeySetTable {
2491    type Error = DatabaseError;
2492    fn try_into(self) -> Result<KeySetInfo, Self::Error> {
2493        Ok(KeySetInfo {
2494            id: Id::from_str(&self.id).map_err(|_| DatabaseError::InvalidKeysetId)?,
2495            unit: CurrencyUnit::from_str(&self.unit)
2496                .map_err(|_| DatabaseError::Internal("Invalid unit".into()))?,
2497            active: self.active,
2498            input_fee_ppk: self.input_fee_ppk as u64,
2499            final_expiry: self.final_expiry.map(|v| v as u64),
2500        })
2501    }
2502}
2503
2504#[derive(Debug, Serialize, Deserialize)]
2505struct P2PKSigningKeyTable {
2506    pubkey: String,
2507    derivation_index: i64,
2508    derivation_path: String,
2509    created_time: i64,
2510    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2511    #[serde(default, skip_serializing, flatten)]
2512    _extra: serde_json::Map<String, serde_json::Value>,
2513}
2514
2515impl TryInto<wallet::P2PKSigningKey> for P2PKSigningKeyTable {
2516    type Error = DatabaseError;
2517    fn try_into(self) -> Result<wallet::P2PKSigningKey, Self::Error> {
2518        Ok(wallet::P2PKSigningKey {
2519            pubkey: PublicKey::from_hex(&self.pubkey)
2520                .map_err(|_| DatabaseError::Internal("Invalid pubkey hex".into()))?,
2521            derivation_path: DerivationPath::from_str(&self.derivation_path)
2522                .map_err(|_| DatabaseError::Internal("Invalid derivation path".into()))?,
2523            derivation_index: self.derivation_index as u32,
2524            created_time: self.created_time as u64,
2525        })
2526    }
2527}
2528
2529#[derive(Debug, Serialize, Deserialize)]
2530struct KeyTable {
2531    id: String,
2532    keys: String, // json string
2533    keyset_u32: Option<i64>,
2534    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2535    #[serde(default, skip_serializing, flatten)]
2536    _extra: serde_json::Map<String, serde_json::Value>,
2537}
2538
2539impl KeyTable {
2540    fn from_keyset(keyset: &KeySet) -> Result<Self, DatabaseError> {
2541        Ok(Self {
2542            id: keyset.id.to_string(),
2543            keys: serde_json::to_string(&keyset.keys)?,
2544            keyset_u32: Some(u32::from(keyset.id) as i64),
2545            _extra: Default::default(),
2546        })
2547    }
2548}
2549
2550impl TryInto<Keys> for KeyTable {
2551    type Error = DatabaseError;
2552    fn try_into(self) -> Result<Keys, Self::Error> {
2553        Ok(serde_json::from_str(&self.keys)?)
2554    }
2555}
2556
2557#[derive(Debug, Serialize, Deserialize)]
2558struct MintQuoteTable {
2559    id: String,
2560    mint_url: String,
2561    amount: i64,
2562    unit: String,
2563    request: Option<String>,
2564    state: String,
2565    expiry: i64,
2566    secret_key: Option<String>,
2567    payment_method: String,
2568    amount_issued: i64,
2569    amount_paid: i64,
2570    #[serde(default)]
2571    used_by_operation: Option<String>,
2572    #[serde(default)]
2573    version: Option<i32>,
2574    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2575    #[serde(default, skip_serializing, flatten)]
2576    _extra: serde_json::Map<String, serde_json::Value>,
2577}
2578
2579impl TryInto<MintQuote> for MintQuoteTable {
2580    type Error = DatabaseError;
2581    fn try_into(self) -> Result<MintQuote, Self::Error> {
2582        Ok(MintQuote {
2583            id: self.id,
2584            mint_url: MintUrl::from_str(&self.mint_url)
2585                .map_err(|e| DatabaseError::Internal(e.to_string()))?,
2586            amount: Some(cdk_common::Amount::from(self.amount as u64)),
2587            unit: CurrencyUnit::from_str(&self.unit)
2588                .map_err(|_| DatabaseError::Internal("Invalid unit".into()))?,
2589            request: self
2590                .request
2591                .ok_or(DatabaseError::Internal("Missing request".into()))?,
2592            state: cdk_common::nuts::MintQuoteState::from_str(&self.state)
2593                .map_err(|_| DatabaseError::Internal("Invalid state".into()))?,
2594            expiry: self.expiry as u64,
2595            secret_key: self
2596                .secret_key
2597                .map(|k| cdk_common::nuts::SecretKey::from_str(&k))
2598                .transpose()
2599                .map_err(|_| DatabaseError::Internal("Invalid secret key".into()))?,
2600            payment_method: cdk_common::PaymentMethod::from_str(&self.payment_method)
2601                .map_err(|_| DatabaseError::Internal("Invalid payment method".into()))?,
2602            amount_issued: cdk_common::Amount::from(self.amount_issued as u64),
2603            amount_paid: cdk_common::Amount::from(self.amount_paid as u64),
2604            estimated_blocks: None,
2605            used_by_operation: self.used_by_operation,
2606            version: self.version.unwrap_or(0) as u32,
2607        })
2608    }
2609}
2610
2611impl TryFrom<MintQuote> for MintQuoteTable {
2612    type Error = DatabaseError;
2613    fn try_from(q: MintQuote) -> Result<Self, Self::Error> {
2614        Ok(Self {
2615            id: q.id,
2616            mint_url: q.mint_url.to_string(),
2617            amount: q.amount.map(|a| a.to_u64() as i64).unwrap_or(0),
2618            unit: q.unit.to_string(),
2619            request: Some(q.request),
2620            state: q.state.to_string(),
2621            expiry: q.expiry as i64,
2622            secret_key: q.secret_key.map(|k| k.to_string()),
2623            payment_method: q.payment_method.to_string(),
2624            amount_issued: q.amount_issued.to_u64() as i64,
2625            amount_paid: q.amount_paid.to_u64() as i64,
2626            used_by_operation: q.used_by_operation,
2627            version: Some(q.version as i32),
2628            _extra: Default::default(),
2629        })
2630    }
2631}
2632
2633#[derive(Debug, Serialize, Deserialize)]
2634struct MeltQuoteTable {
2635    id: String,
2636    unit: String,
2637    amount: i64,
2638    request: String,
2639    fee_reserve: i64,
2640    state: String,
2641    expiry: i64,
2642    payment_proof: Option<String>,
2643    payment_method: String,
2644    #[serde(default)]
2645    estimated_blocks: Option<i64>,
2646    #[serde(default)]
2647    fee_index: Option<i64>,
2648    #[serde(default)]
2649    mint_url: Option<String>,
2650    #[serde(default)]
2651    used_by_operation: Option<String>,
2652    #[serde(default)]
2653    version: Option<i32>,
2654    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2655    #[serde(default, skip_serializing, flatten)]
2656    _extra: serde_json::Map<String, serde_json::Value>,
2657}
2658
2659impl TryInto<wallet::MeltQuote> for MeltQuoteTable {
2660    type Error = DatabaseError;
2661    fn try_into(self) -> Result<wallet::MeltQuote, Self::Error> {
2662        Ok(wallet::MeltQuote {
2663            id: self.id,
2664            mint_url: self
2665                .mint_url
2666                .as_deref()
2667                .map(cdk_common::mint_url::MintUrl::from_str)
2668                .transpose()
2669                .map_err(|_| DatabaseError::Internal("Invalid mint URL".into()))?,
2670            unit: CurrencyUnit::from_str(&self.unit)
2671                .map_err(|_| DatabaseError::Internal("Invalid unit".into()))?,
2672            amount: cdk_common::Amount::from(self.amount as u64),
2673            request: self.request,
2674            fee_reserve: cdk_common::Amount::from(self.fee_reserve as u64),
2675            state: cdk_common::nuts::MeltQuoteState::from_str(&self.state)
2676                .map_err(|_| DatabaseError::Internal("Invalid state".into()))?,
2677            expiry: self.expiry as u64,
2678            payment_proof: self.payment_proof,
2679            payment_method: cdk_common::PaymentMethod::from_str(&self.payment_method)
2680                .map_err(|_| DatabaseError::Internal("Invalid payment method".into()))?,
2681            estimated_blocks: self
2682                .estimated_blocks
2683                .map(u32::try_from)
2684                .transpose()
2685                .map_err(|_| DatabaseError::Internal("Invalid estimated_blocks".into()))?,
2686            fee_index: self
2687                .fee_index
2688                .map(u32::try_from)
2689                .transpose()
2690                .map_err(|_| DatabaseError::Internal("Invalid fee_index".into()))?,
2691            used_by_operation: self.used_by_operation,
2692            version: self.version.unwrap_or(0) as u32,
2693        })
2694    }
2695}
2696
2697impl TryFrom<wallet::MeltQuote> for MeltQuoteTable {
2698    type Error = DatabaseError;
2699    fn try_from(q: wallet::MeltQuote) -> Result<Self, Self::Error> {
2700        Ok(Self {
2701            id: q.id,
2702            mint_url: q.mint_url.map(|u| u.to_string()),
2703            unit: q.unit.to_string(),
2704            amount: q.amount.to_u64() as i64,
2705            request: q.request,
2706            fee_reserve: q.fee_reserve.to_u64() as i64,
2707            state: q.state.to_string(),
2708            expiry: q.expiry as i64,
2709            payment_proof: q.payment_proof,
2710            payment_method: q.payment_method.to_string(),
2711            estimated_blocks: q.estimated_blocks.map(i64::from),
2712            fee_index: q.fee_index.map(i64::from),
2713            used_by_operation: q.used_by_operation,
2714            version: Some(q.version as i32),
2715            _extra: Default::default(),
2716        })
2717    }
2718}
2719
2720#[derive(Debug, Serialize, Deserialize)]
2721struct ProofTable {
2722    y: String,
2723    mint_url: String,
2724    state: String,
2725    spending_condition: Option<String>,
2726    unit: String,
2727    amount: i64,
2728    keyset_id: String,
2729    secret: String,
2730    c: String,
2731    witness: Option<String>,
2732    dleq_e: Option<String>,
2733    dleq_s: Option<String>,
2734    dleq_r: Option<String>,
2735    #[serde(default)]
2736    used_by_operation: Option<String>,
2737    #[serde(default)]
2738    created_by_operation: Option<String>,
2739    #[serde(default)]
2740    p2pk_e: Option<String>,
2741    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2742    #[serde(default, skip_serializing, flatten)]
2743    _extra: serde_json::Map<String, serde_json::Value>,
2744}
2745
2746impl TryInto<ProofInfo> for ProofTable {
2747    type Error = DatabaseError;
2748    fn try_into(self) -> Result<ProofInfo, Self::Error> {
2749        let y = PublicKey::from_hex(&self.y)
2750            .map_err(|_| DatabaseError::Internal("Invalid y".into()))?;
2751        let c = PublicKey::from_hex(&self.c)
2752            .map_err(|_| DatabaseError::Internal("Invalid c".into()))?;
2753        Ok(ProofInfo {
2754            y,
2755            mint_url: MintUrl::from_str(&self.mint_url)
2756                .map_err(|e| DatabaseError::Internal(e.to_string()))?,
2757            state: cdk_common::nuts::State::from_str(&self.state)
2758                .map_err(|_| DatabaseError::Internal("Invalid state".into()))?,
2759            spending_condition: self
2760                .spending_condition
2761                .filter(|s| !s.trim().is_empty())
2762                .map(|s| serde_json::from_str(&s))
2763                .transpose()?,
2764            unit: CurrencyUnit::from_str(&self.unit)
2765                .map_err(|_| DatabaseError::Internal("Invalid unit".into()))?,
2766            proof: cdk_common::Proof {
2767                amount: cdk_common::Amount::from(self.amount as u64),
2768                keyset_id: Id::from_str(&self.keyset_id)
2769                    .map_err(|_| DatabaseError::InvalidKeysetId)?,
2770                secret: Secret::from_str(&self.secret)
2771                    .map_err(|_| DatabaseError::Internal("Invalid secret".into()))?,
2772                c,
2773                witness: self
2774                    .witness
2775                    .filter(|w| !w.trim().is_empty())
2776                    .map(|w| serde_json::from_str(&w))
2777                    .transpose()?,
2778                dleq: match (self.dleq_e, self.dleq_s, self.dleq_r) {
2779                    (Some(e), Some(s), Some(r)) => Some(cdk_common::ProofDleq {
2780                        e: cdk_common::SecretKey::from_hex(&e)
2781                            .map_err(|_| DatabaseError::Internal("Invalid dleq_e".into()))?,
2782                        s: cdk_common::SecretKey::from_hex(&s)
2783                            .map_err(|_| DatabaseError::Internal("Invalid dleq_s".into()))?,
2784                        r: cdk_common::SecretKey::from_hex(&r)
2785                            .map_err(|_| DatabaseError::Internal("Invalid dleq_r".into()))?,
2786                    }),
2787                    _ => None,
2788                },
2789                p2pk_e: self
2790                    .p2pk_e
2791                    .map(|s| PublicKey::from_hex(&s))
2792                    .transpose()
2793                    .map_err(|_| DatabaseError::Internal("Invalid p2pk_e".into()))?,
2794            },
2795            used_by_operation: self
2796                .used_by_operation
2797                .map(|s| uuid::Uuid::parse_str(&s))
2798                .transpose()
2799                .map_err(|_| DatabaseError::Internal("Invalid used_by_operation uuid".into()))?,
2800            created_by_operation: self
2801                .created_by_operation
2802                .map(|s| uuid::Uuid::parse_str(&s))
2803                .transpose()
2804                .map_err(|_| DatabaseError::Internal("Invalid created_by_operation uuid".into()))?,
2805        })
2806    }
2807}
2808
2809impl TryFrom<ProofInfo> for ProofTable {
2810    type Error = DatabaseError;
2811    fn try_from(p: ProofInfo) -> Result<Self, Self::Error> {
2812        Ok(Self {
2813            y: hex::encode(p.y.to_bytes()),
2814            mint_url: p.mint_url.to_string(),
2815            state: p.state.to_string(),
2816            spending_condition: p
2817                .spending_condition
2818                .map(|s| serde_json::to_string(&s))
2819                .transpose()?,
2820            unit: p.unit.to_string(),
2821            amount: p.proof.amount.to_u64() as i64,
2822            keyset_id: p.proof.keyset_id.to_string(),
2823            secret: p.proof.secret.to_string(),
2824            c: hex::encode(p.proof.c.to_bytes()),
2825            witness: p
2826                .proof
2827                .witness
2828                .map(|w| serde_json::to_string(&w))
2829                .transpose()?,
2830            dleq_e: p
2831                .proof
2832                .dleq
2833                .as_ref()
2834                .map(|d| hex::encode(d.e.to_secret_bytes())),
2835            dleq_s: p
2836                .proof
2837                .dleq
2838                .as_ref()
2839                .map(|d| hex::encode(d.s.to_secret_bytes())),
2840            dleq_r: p
2841                .proof
2842                .dleq
2843                .as_ref()
2844                .map(|d| hex::encode(d.r.to_secret_bytes())),
2845            used_by_operation: p.used_by_operation.map(|u| u.to_string()),
2846            created_by_operation: p.created_by_operation.map(|u| u.to_string()),
2847            p2pk_e: p.proof.p2pk_e.map(|e| hex::encode(e.to_bytes())),
2848            _extra: Default::default(),
2849        })
2850    }
2851}
2852
2853#[derive(Debug, Serialize, Deserialize)]
2854struct TransactionTable {
2855    id: String,
2856    mint_url: String,
2857    direction: String,
2858    unit: String,
2859    amount: i64,
2860    fee: i64,
2861    ys: Option<Vec<String>>,
2862    timestamp: i64,
2863    memo: Option<String>,
2864    metadata: Option<String>,
2865    quote_id: Option<String>,
2866    payment_request: Option<String>,
2867    payment_proof: Option<String>,
2868    payment_method: Option<String>,
2869    #[serde(default)]
2870    saga_id: Option<String>,
2871    /// Extra fields from other applications (captured during deserialization, ignored during serialization)
2872    #[serde(default, skip_serializing, flatten)]
2873    _extra: serde_json::Map<String, serde_json::Value>,
2874}
2875
2876impl TryInto<Transaction> for TransactionTable {
2877    type Error = DatabaseError;
2878    fn try_into(self) -> Result<Transaction, Self::Error> {
2879        let id_bytes = hex::decode(&self.id)
2880            .map_err(|_| DatabaseError::Internal("Invalid transaction id hex".into()))?;
2881        let _id_arr: [u8; 32] = id_bytes
2882            .try_into()
2883            .map_err(|_| DatabaseError::Internal("Invalid transaction id len".into()))?;
2884
2885        let ys = match self.ys {
2886            Some(strs) => strs
2887                .into_iter()
2888                .map(|s| {
2889                    PublicKey::from_hex(&s)
2890                        .map_err(|_| DatabaseError::Internal("Invalid y hex".into()))
2891                })
2892                .collect::<Result<Vec<_>, _>>()?,
2893            None => vec![],
2894        };
2895
2896        Ok(Transaction {
2897            mint_url: MintUrl::from_str(&self.mint_url)
2898                .map_err(|e| DatabaseError::Internal(e.to_string()))?,
2899            direction: TransactionDirection::from_str(&self.direction)
2900                .map_err(|_| DatabaseError::Internal("Invalid direction".into()))?,
2901            unit: CurrencyUnit::from_str(&self.unit)
2902                .map_err(|_| DatabaseError::Internal("Invalid unit".into()))?,
2903            amount: cdk_common::Amount::from(self.amount as u64),
2904            fee: cdk_common::Amount::from(self.fee as u64),
2905            ys,
2906            timestamp: self.timestamp as u64,
2907            memo: self.memo,
2908            metadata: self
2909                .metadata
2910                .filter(|m| !m.trim().is_empty())
2911                .map(|m| serde_json::from_str(&m))
2912                .transpose()?
2913                .unwrap_or_default(),
2914            quote_id: self.quote_id,
2915            payment_request: self.payment_request,
2916            payment_proof: self.payment_proof,
2917            payment_method: self
2918                .payment_method
2919                .map(|p| cdk_common::PaymentMethod::from_str(&p))
2920                .transpose()
2921                .map_err(|_| DatabaseError::Internal("Invalid payment method".into()))?,
2922            saga_id: self
2923                .saga_id
2924                .map(|s| uuid::Uuid::parse_str(&s))
2925                .transpose()
2926                .map_err(|_| DatabaseError::Internal("Invalid saga_id uuid".into()))?,
2927        })
2928    }
2929}
2930
2931impl TryFrom<Transaction> for TransactionTable {
2932    type Error = DatabaseError;
2933    fn try_from(t: Transaction) -> Result<Self, Self::Error> {
2934        Ok(Self {
2935            id: t.id().to_string(),
2936            mint_url: t.mint_url.to_string(),
2937            direction: t.direction.to_string(),
2938            unit: t.unit.to_string(),
2939            amount: t.amount.to_u64() as i64,
2940            fee: t.fee.to_u64() as i64,
2941            ys: Some(t.ys.iter().map(|y| hex::encode(y.to_bytes())).collect()),
2942            timestamp: t.timestamp as i64,
2943            memo: t.memo,
2944            metadata: if t.metadata.is_empty() {
2945                None
2946            } else {
2947                Some(serde_json::to_string(&t.metadata)?)
2948            },
2949            quote_id: t.quote_id,
2950            payment_request: t.payment_request,
2951            payment_proof: t.payment_proof,
2952            payment_method: t.payment_method.map(|p| p.to_string()),
2953            saga_id: t.saga_id.map(|u| u.to_string()),
2954            _extra: Default::default(),
2955        })
2956    }
2957}
2958
2959#[derive(Debug, Serialize, Deserialize)]
2960struct SagaTable {
2961    id: String,
2962    data: String, // JSON-serialized WalletSaga
2963    version: i32,
2964    completed: bool,
2965    created_at: i64,
2966    updated_at: i64,
2967    /// Extra fields from other applications
2968    #[serde(default, skip_serializing, flatten)]
2969    _extra: serde_json::Map<String, serde_json::Value>,
2970}
2971
2972/// Response from Supabase Auth sign-up/sign-in
2973#[derive(Debug, Deserialize, Serialize, Clone)]
2974pub struct SupabaseAuthResponse {
2975    /// Access token
2976    pub access_token: String,
2977    /// Token type
2978    pub token_type: String,
2979    /// Expires in
2980    pub expires_in: Option<i64>,
2981    /// Refresh token
2982    pub refresh_token: Option<String>,
2983    /// User
2984    pub user: serde_json::Value,
2985}
2986
2987/// Helper for Supabase Authentication
2988#[derive(Debug)]
2989pub struct SupabaseAuth;
2990
2991impl SupabaseAuth {
2992    /// Sign up a new user with email and password
2993    pub async fn signup(
2994        url: &Url,
2995        api_key: &str,
2996        email: &str,
2997        password: &str,
2998    ) -> Result<SupabaseAuthResponse, Error> {
2999        let auth_url = url
3000            .join("auth/v1/signup")
3001            .map_err(|e| Error::Supabase(format!("Invalid auth URL: {}", e)))?;
3002
3003        let client = Client::new();
3004        let body = serde_json::json!({
3005            "email": email,
3006            "password": password
3007        });
3008
3009        let response = client
3010            .post(auth_url)
3011            .header("apikey", api_key)
3012            .header("Content-Type", "application/json")
3013            .json(&body)
3014            .send()
3015            .await
3016            .map_err(Error::Reqwest)?;
3017
3018        let status = response.status();
3019        if !status.is_success() {
3020            let text = response.text().await.unwrap_or_default();
3021            return Err(Error::Supabase(format!(
3022                "Supabase signup failed: HTTP {} - {}",
3023                status, text
3024            )));
3025        }
3026
3027        response.json().await.map_err(Error::Reqwest)
3028    }
3029
3030    /// Sign in a user with email and password
3031    pub async fn signin(
3032        url: &Url,
3033        api_key: &str,
3034        email: &str,
3035        password: &str,
3036    ) -> Result<SupabaseAuthResponse, Error> {
3037        let auth_url = url
3038            .join("auth/v1/token?grant_type=password")
3039            .map_err(|e| Error::Supabase(format!("Invalid auth URL: {}", e)))?;
3040
3041        let client = Client::new();
3042        let body = serde_json::json!({
3043            "email": email,
3044            "password": password
3045        });
3046
3047        let response = client
3048            .post(auth_url)
3049            .header("apikey", api_key)
3050            .header("Content-Type", "application/json")
3051            .json(&body)
3052            .send()
3053            .await
3054            .map_err(Error::Reqwest)?;
3055
3056        let status = response.status();
3057        if !status.is_success() {
3058            let text = response.text().await.unwrap_or_default();
3059            return Err(Error::Supabase(format!(
3060                "Supabase signin failed: HTTP {} - {}",
3061                status, text
3062            )));
3063        }
3064
3065        response.json().await.map_err(Error::Reqwest)
3066    }
3067}
3068
3069#[cfg(test)]
3070mod tests {
3071    use std::time::{SystemTime, UNIX_EPOCH};
3072
3073    use bitcoin::base64::engine::general_purpose;
3074    use bitcoin::base64::Engine as _;
3075    use cdk_common::database::{Error as DatabaseError, WalletDatabase};
3076    #[cfg(feature = "integration-tests")]
3077    use cdk_common::wallet_db_test;
3078    use mockito::Matcher;
3079    use serde_json::json;
3080    use url::Url;
3081
3082    use super::*;
3083    #[cfg(feature = "integration-tests")]
3084    use crate::Error;
3085    use crate::SupabaseWalletDatabase;
3086
3087    fn extract_schema_versions(schema_sql: &str) -> Vec<u32> {
3088        const PREFIX: &str = "VALUES ('schema_version', '";
3089
3090        schema_sql
3091            .lines()
3092            .filter_map(|line| {
3093                let start = line.find(PREFIX)? + PREFIX.len();
3094                let end = line[start..].find('\'')? + start;
3095                line[start..end].parse().ok()
3096            })
3097            .collect()
3098    }
3099
3100    fn schema_info_query() -> Matcher {
3101        Matcher::AllOf(vec![
3102            Matcher::UrlEncoded("key".to_string(), "eq.schema_version".to_string()),
3103            Matcher::UrlEncoded("select".to_string(), "value".to_string()),
3104        ])
3105    }
3106
3107    fn encryption_metadata_query() -> Matcher {
3108        Matcher::AllOf(vec![
3109            Matcher::UrlEncoded(
3110                "select".to_string(),
3111                "version,kdf,salt,scrypt_log_n,scrypt_r,scrypt_p".to_string(),
3112            ),
3113            Matcher::UrlEncoded("limit".to_string(), "1".to_string()),
3114        ])
3115    }
3116
3117    fn jwt_with_expiry(exp: u64) -> String {
3118        let header = general_purpose::URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
3119        let payload =
3120            general_purpose::URL_SAFE_NO_PAD.encode(format!(r#"{{"exp":{exp}}}"#).as_bytes());
3121
3122        format!("{header}.{payload}.signature")
3123    }
3124
3125    fn unix_now() -> u64 {
3126        SystemTime::now()
3127            .duration_since(UNIX_EPOCH)
3128            .expect("system time should be after unix epoch")
3129            .as_secs()
3130    }
3131
3132    #[test]
3133    fn supabase_wallet_database_implements_wallet_database() {
3134        fn assert_wallet_database<T: WalletDatabase<DatabaseError>>() {}
3135
3136        assert_wallet_database::<SupabaseWalletDatabase>();
3137    }
3138
3139    #[test]
3140    fn schema_sql_tracks_required_schema_version() {
3141        let schema_sql = SupabaseWalletDatabase::get_schema_sql();
3142        let versions = extract_schema_versions(&schema_sql);
3143
3144        assert!(
3145            !versions.is_empty(),
3146            "embedded schema should expose a schema version"
3147        );
3148        assert!(schema_sql.contains("CREATE TABLE IF NOT EXISTS schema_info"));
3149        assert!(schema_sql.contains("CREATE TABLE IF NOT EXISTS p2pk_signing_key"));
3150        assert!(schema_sql.contains("CREATE TABLE IF NOT EXISTS wallet_encryption_metadata"));
3151        assert_eq!(
3152            versions.last().copied(),
3153            Some(SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION)
3154        );
3155        assert_eq!(
3156            versions.iter().max().copied(),
3157            Some(SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION)
3158        );
3159    }
3160
3161    #[tokio::test]
3162    async fn schema_compatibility_uses_api_key_without_jwt() {
3163        let mut server = mockito::Server::new_async().await;
3164        let required_version = SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION.to_string();
3165        let mock = server
3166            .mock("GET", "/rest/v1/schema_info")
3167            .match_query(schema_info_query())
3168            .match_header("apikey", "anon-key")
3169            .match_header("authorization", "Bearer anon-key")
3170            .with_status(200)
3171            .with_header("content-type", "application/json")
3172            .with_body(format!(r#"[{{"value":"{required_version}"}}]"#))
3173            .create_async()
3174            .await;
3175
3176        let db = SupabaseWalletDatabase::new(
3177            Url::parse(&server.url()).expect("mock server URL should parse"),
3178            "anon-key".to_string(),
3179        )
3180        .await
3181        .expect("database should initialize");
3182
3183        db.check_schema_compatibility()
3184            .await
3185            .expect("required schema version should pass");
3186
3187        mock.assert_async().await;
3188    }
3189
3190    #[tokio::test]
3191    async fn schema_compatibility_reports_outdated_schema() {
3192        let mut server = mockito::Server::new_async().await;
3193        let found_version = SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION - 1;
3194        let mock = server
3195            .mock("GET", "/rest/v1/schema_info")
3196            .match_query(schema_info_query())
3197            .match_header("apikey", "anon-key")
3198            .match_header("authorization", "Bearer anon-key")
3199            .with_status(200)
3200            .with_header("content-type", "application/json")
3201            .with_body(format!(r#"[{{"value":"{found_version}"}}]"#))
3202            .create_async()
3203            .await;
3204
3205        let db = SupabaseWalletDatabase::new(
3206            Url::parse(&server.url()).expect("mock server URL should parse"),
3207            "anon-key".to_string(),
3208        )
3209        .await
3210        .expect("database should initialize");
3211
3212        let error = db
3213            .check_schema_compatibility()
3214            .await
3215            .expect_err("outdated schema should fail");
3216
3217        match error {
3218            Error::SchemaMismatch { required, found } => {
3219                assert_eq!(required, SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION);
3220                assert_eq!(found, found_version);
3221            }
3222            other => panic!("expected schema mismatch error, got {other:?}"),
3223        }
3224
3225        mock.assert_async().await;
3226    }
3227
3228    #[tokio::test]
3229    async fn schema_compatibility_reports_missing_schema_info() {
3230        let mut server = mockito::Server::new_async().await;
3231        let mock = server
3232            .mock("GET", "/rest/v1/schema_info")
3233            .match_query(schema_info_query())
3234            .match_header("apikey", "anon-key")
3235            .match_header("authorization", "Bearer anon-key")
3236            .with_status(404)
3237            .with_body(r#"{"message":"relation \"schema_info\" does not exist"}"#)
3238            .create_async()
3239            .await;
3240
3241        let db = SupabaseWalletDatabase::new(
3242            Url::parse(&server.url()).expect("mock server URL should parse"),
3243            "anon-key".to_string(),
3244        )
3245        .await
3246        .expect("database should initialize");
3247
3248        let error = db
3249            .check_schema_compatibility()
3250            .await
3251            .expect_err("missing schema_info should fail");
3252
3253        match error {
3254            Error::SchemaNotInitialized => {}
3255            other => panic!("expected schema not initialized error, got {other:?}"),
3256        }
3257
3258        mock.assert_async().await;
3259    }
3260
3261    #[tokio::test]
3262    async fn call_rpc_uses_jwt_token_and_serializes_json_body() {
3263        let mut server = mockito::Server::new_async().await;
3264        let mock = server
3265            .mock("POST", "/rest/v1/rpc/update_proofs_atomic")
3266            .match_header("apikey", "anon-key")
3267            .match_header("authorization", "Bearer jwt-token")
3268            .match_header("content-type", "application/json")
3269            .match_body(Matcher::Json(json!({ "proofs": [] })))
3270            .with_status(200)
3271            .with_header("content-type", "application/json")
3272            .with_body(r#"{"updated":1}"#)
3273            .create_async()
3274            .await;
3275
3276        let db = SupabaseWalletDatabase::new(
3277            Url::parse(&server.url()).expect("mock server URL should parse"),
3278            "anon-key".to_string(),
3279        )
3280        .await
3281        .expect("database should initialize");
3282        db.set_jwt_token(Some("jwt-token".to_string())).await;
3283
3284        let response = db
3285            .call_rpc("update_proofs_atomic", r#"{"proofs":[]}"#)
3286            .await
3287            .expect("RPC request should succeed");
3288
3289        assert_eq!(
3290            serde_json::from_str::<serde_json::Value>(&response)
3291                .expect("RPC response should be JSON"),
3292            json!({ "updated": 1 })
3293        );
3294        mock.assert_async().await;
3295    }
3296
3297    #[tokio::test]
3298    async fn call_rpc_encodes_function_name_path_segments() {
3299        let mut server = mockito::Server::new_async().await;
3300        let mock = server
3301            .mock(
3302                "POST",
3303                "/rest/v1/rpc/..%2F..%2F..%2Fauth%2Fv1%2Fadmin%2Fusers",
3304            )
3305            .match_header("apikey", "anon-key")
3306            .match_header("authorization", "Bearer jwt-token")
3307            .match_header("content-type", "application/json")
3308            .match_body(Matcher::Json(json!({})))
3309            .with_status(200)
3310            .with_header("content-type", "application/json")
3311            .with_body(r#"{"ok":true}"#)
3312            .create_async()
3313            .await;
3314
3315        let db = SupabaseWalletDatabase::new(
3316            Url::parse(&server.url()).expect("mock server URL should parse"),
3317            "anon-key".to_string(),
3318        )
3319        .await
3320        .expect("database should initialize");
3321        db.set_jwt_token(Some("jwt-token".to_string())).await;
3322
3323        let response = db
3324            .call_rpc("../../../auth/v1/admin/users", "{}")
3325            .await
3326            .expect("RPC request should succeed");
3327
3328        assert_eq!(
3329            serde_json::from_str::<serde_json::Value>(&response)
3330                .expect("RPC response should be JSON"),
3331            json!({ "ok": true })
3332        );
3333        mock.assert_async().await;
3334    }
3335
3336    #[tokio::test]
3337    async fn schema_compatibility_refreshes_expiring_supabase_tokens() {
3338        let mut server = mockito::Server::new_async().await;
3339        let refreshed_token = "fresh-access-token";
3340        let refreshed_auth_header = format!("Bearer {refreshed_token}");
3341        let required_version = SupabaseWalletDatabase::REQUIRED_SCHEMA_VERSION.to_string();
3342
3343        let refresh_mock = server
3344            .mock("POST", "/auth/v1/token")
3345            .match_query(Matcher::UrlEncoded(
3346                "grant_type".to_string(),
3347                "refresh_token".to_string(),
3348            ))
3349            .match_header("apikey", "anon-key")
3350            .match_header("content-type", "application/json")
3351            .match_body(Matcher::Json(json!({ "refresh_token": "refresh-token" })))
3352            .with_status(200)
3353            .with_header("content-type", "application/json")
3354            .with_body(format!(
3355                r#"{{"access_token":"{refreshed_token}","refresh_token":"rotated-refresh-token","expires_in":3600}}"#
3356            ))
3357            .create_async()
3358            .await;
3359
3360        let schema_mock = server
3361            .mock("GET", "/rest/v1/schema_info")
3362            .match_query(schema_info_query())
3363            .match_header("apikey", "anon-key")
3364            .match_header("authorization", refreshed_auth_header.as_str())
3365            .with_status(200)
3366            .with_header("content-type", "application/json")
3367            .with_body(format!(r#"[{{"value":"{required_version}"}}]"#))
3368            .create_async()
3369            .await;
3370
3371        let db = SupabaseWalletDatabase::with_supabase_auth(
3372            Url::parse(&server.url()).expect("mock server URL should parse"),
3373            "anon-key".to_string(),
3374        )
3375        .await
3376        .expect("database should initialize");
3377        db.set_refresh_token(Some("refresh-token".to_string()))
3378            .await;
3379        db.set_jwt_token(Some(jwt_with_expiry(unix_now().saturating_sub(1))))
3380            .await;
3381
3382        db.check_schema_compatibility()
3383            .await
3384            .expect("token refresh should make schema check succeed");
3385
3386        assert_eq!(db.get_jwt_token().await.as_deref(), Some(refreshed_token));
3387        refresh_mock.assert_async().await;
3388        schema_mock.assert_async().await;
3389    }
3390
3391    #[test]
3392    fn encryption_key_derivation_uses_scrypt_metadata() {
3393        use bitcoin::hashes::{sha256, Hash};
3394
3395        let metadata = EncryptionMetadataTable {
3396            version: ENCRYPTION_METADATA_VERSION,
3397            kdf: ENCRYPTION_KDF.to_string(),
3398            salt: "000102030405060708090a0b0c0d0e0f".to_string(),
3399            scrypt_log_n: SCRYPT_LOG_N as i64,
3400            scrypt_r: SCRYPT_R as i64,
3401            scrypt_p: SCRYPT_P as i64,
3402            _extra: Default::default(),
3403        };
3404        let key = SupabaseWalletDatabase::derive_encryption_key("password", &metadata)
3405            .expect("scrypt key derivation should succeed");
3406        let raw_sha = sha256::Hash::hash(b"password");
3407
3408        assert_ne!(key.as_slice(), &raw_sha.as_byte_array()[..]);
3409    }
3410
3411    #[tokio::test]
3412    async fn set_encryption_password_creates_metadata_and_sets_key() {
3413        use bitcoin::hashes::{sha256, Hash};
3414
3415        let mut server = mockito::Server::new_async().await;
3416        let get_mock = server
3417            .mock("GET", "/rest/v1/wallet_encryption_metadata")
3418            .match_query(encryption_metadata_query())
3419            .match_header("apikey", "anon-key")
3420            .match_header("authorization", "Bearer anon-key")
3421            .with_status(200)
3422            .with_header("content-type", "application/json")
3423            .with_body("[]")
3424            .create_async()
3425            .await;
3426        let insert_mock = server
3427            .mock("POST", "/rest/v1/wallet_encryption_metadata")
3428            .match_header("apikey", "anon-key")
3429            .match_header("authorization", "Bearer anon-key")
3430            .match_header("prefer", "missing=default")
3431            .with_status(201)
3432            .create_async()
3433            .await;
3434
3435        let db = SupabaseWalletDatabase::new(
3436            Url::parse(&server.url()).expect("mock server URL should parse"),
3437            "anon-key".to_string(),
3438        )
3439        .await
3440        .expect("database should initialize");
3441
3442        db.set_encryption_password("password")
3443            .await
3444            .expect("encryption password should be set");
3445
3446        let guard = db.encryption_key.read().await;
3447        let key = guard.as_ref().expect("encryption key should be set");
3448        let raw_sha = sha256::Hash::hash(b"password");
3449        assert_ne!(key.as_slice(), &raw_sha.as_byte_array()[..]);
3450
3451        get_mock.assert_async().await;
3452        insert_mock.assert_async().await;
3453    }
3454
3455    #[tokio::test]
3456    async fn set_encryption_password_reuses_existing_metadata() {
3457        let mut server = mockito::Server::new_async().await;
3458        let salt = "000102030405060708090a0b0c0d0e0f";
3459        let get_mock = server
3460            .mock("GET", "/rest/v1/wallet_encryption_metadata")
3461            .match_query(encryption_metadata_query())
3462            .match_header("apikey", "anon-key")
3463            .match_header("authorization", "Bearer anon-key")
3464            .with_status(200)
3465            .with_header("content-type", "application/json")
3466            .with_body(format!(
3467                r#"[{{"version":1,"kdf":"scrypt","salt":"{salt}","scrypt_log_n":15,"scrypt_r":8,"scrypt_p":1}}]"#
3468            ))
3469            .create_async()
3470            .await;
3471
3472        let db = SupabaseWalletDatabase::new(
3473            Url::parse(&server.url()).expect("mock server URL should parse"),
3474            "anon-key".to_string(),
3475        )
3476        .await
3477        .expect("database should initialize");
3478
3479        db.set_encryption_password("password")
3480            .await
3481            .expect("encryption password should be set");
3482
3483        let metadata = EncryptionMetadataTable {
3484            version: ENCRYPTION_METADATA_VERSION,
3485            kdf: ENCRYPTION_KDF.to_string(),
3486            salt: salt.to_string(),
3487            scrypt_log_n: SCRYPT_LOG_N as i64,
3488            scrypt_r: SCRYPT_R as i64,
3489            scrypt_p: SCRYPT_P as i64,
3490            _extra: Default::default(),
3491        };
3492        let expected = SupabaseWalletDatabase::derive_encryption_key("password", &metadata)
3493            .expect("scrypt key derivation should succeed");
3494
3495        let guard = db.encryption_key.read().await;
3496        let key = guard.as_ref().expect("encryption key should be set");
3497        assert_eq!(key.as_slice(), expected.as_slice());
3498
3499        get_mock.assert_async().await;
3500    }
3501
3502    // -------------------------------------------------------------------------
3503    // Integration tests against a live (or local) Supabase instance.
3504    //
3505    // These tests are gated behind the `integration-tests` feature and the
3506    // environment variables:
3507    //   SUPABASE_URL       – e.g. http://localhost:54321 or https://<ref>.supabase.co
3508    //   SUPABASE_ANON_KEY  – the project's `anon` / publishable API key
3509    //
3510    // Each test signs up a fresh, unique user so that Row-Level Security
3511    // guarantees complete data isolation between concurrent test runs.
3512    // The test_id string passed to `provide_db` is used as a per-test
3513    // password for client-side AES-256-GCM encryption of proof secrets.
3514    //
3515    // To run locally against the Supabase CLI:
3516    //   supabase start                       # starts PostgREST + GoTrue + Postgres
3517    //   export SUPABASE_URL=http://localhost:54321
3518    //   export SUPABASE_ANON_KEY=$(supabase status --output json | jq -r '.ANON_KEY')
3519    //   # Apply migrations once:
3520    //   supabase db push   OR   run get_schema_sql() output in the SQL editor
3521    //   cargo test -p cdk-supabase
3522    // -------------------------------------------------------------------------
3523
3524    /// Build a fresh `SupabaseWalletDatabase` for one test run.
3525    ///
3526    /// * Signs up a brand-new Supabase Auth user whose email is derived from
3527    ///   `test_id`, ensuring RLS-based data isolation from all other tests.
3528    /// * Sets an AES-256-GCM encryption password equal to `test_id` so every
3529    ///   test uses a distinct key.
3530    /// * If the required environment variables are absent the function panics
3531    ///   with a clear message – the `wallet_db_test!` expansion will then
3532    ///   report the test as failed, which is intentional: the CI job that
3533    ///   enables this job *must* supply the credentials.
3534    #[cfg(feature = "integration-tests")]
3535    pub async fn provide_db(test_id: String) -> SupabaseWalletDatabase {
3536        let url_str = std::env::var("SUPABASE_URL")
3537            .expect("SUPABASE_URL must be set to run Supabase integration tests");
3538        let anon_key = std::env::var("SUPABASE_ANON_KEY")
3539            .expect("SUPABASE_ANON_KEY must be set to run Supabase integration tests");
3540
3541        let url = Url::parse(&url_str).expect("SUPABASE_URL is not a valid URL");
3542
3543        // Use a hash of the test_id as a short, filesystem-safe identifier.
3544        // Supabase email addresses must be ≤ 254 chars; UUIDs are safe here.
3545        let email_id = {
3546            use bitcoin::hashes::{sha256, Hash};
3547            let hash = sha256::Hash::hash(test_id.as_bytes());
3548            hex::encode(&hash.as_byte_array()[..8]) // 16 hex chars
3549        };
3550        let email = format!("cdk-test-{}@example.com", email_id);
3551        // Use a fixed-length password derived from the test id.
3552        let password = {
3553            use bitcoin::hashes::{sha256, Hash};
3554            let hash = sha256::Hash::hash(test_id.as_bytes());
3555            hex::encode(hash.as_byte_array()) // 64 hex chars, always valid
3556        };
3557
3558        let db = SupabaseWalletDatabase::with_supabase_auth(url, anon_key)
3559            .await
3560            .expect("failed to create SupabaseWalletDatabase");
3561
3562        // Sign up a fresh user. Ignore "already registered" errors that can
3563        // occur if a previous test run with the same hash left the user behind.
3564        let auth_result = db.signup(&email, &password).await;
3565        match auth_result {
3566            Ok(_) => {}
3567            Err(Error::Supabase(msg)) if msg.contains("already registered") => {
3568                // Re-use the existing user by signing in instead.
3569                db.signin(&email, &password)
3570                    .await
3571                    .expect("failed to sign in with existing test user");
3572            }
3573            Err(e) => panic!("Supabase signup failed: {e}"),
3574        }
3575
3576        // Each test encrypts its proof secrets with a unique key.
3577        db.set_encryption_password(&test_id)
3578            .await
3579            .expect("failed to set Supabase wallet encryption password");
3580
3581        db
3582    }
3583
3584    #[cfg(feature = "integration-tests")]
3585    cdk_common::wallet_db_test!(provide_db);
3586}