use crate::database::DatabaseManager;
use alloy::primitives::{Address, FixedBytes};
use sqlx::Row;
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum DataType {
Privacy,
Identity,
Secrets,
Confidential,
}
impl DataType {
pub fn as_str(&self) -> &'static str {
match self {
DataType::Privacy => "privacy",
DataType::Identity => "identity",
DataType::Secrets => "secrets",
DataType::Confidential => "confidential",
}
}
pub fn from_sql_str(s: &str) -> Option<Self> {
match s {
"privacy" => Some(DataType::Privacy),
"identity" => Some(DataType::Identity),
"secrets" => Some(DataType::Secrets),
"confidential" => Some(DataType::Confidential),
_ => None,
}
}
}
impl std::fmt::Display for DataType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone)]
pub struct EncryptedDataRefRecord {
pub id: Uuid,
pub data_type: DataType,
pub chain_id: u64,
pub sender_address: Address,
pub policy_client_address: Address,
pub envelope: Vec<u8>,
pub signature: Option<Vec<u8>>,
pub sender_pubkey: Option<Vec<u8>>,
pub created_at: chrono::DateTime<chrono::Utc>,
pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
pub domain: Option<FixedBytes<32>>,
pub owner: Option<Address>,
pub data_ref_id: Option<String>,
pub confirmed_at: Option<chrono::DateTime<chrono::Utc>>,
pub policy_data_address: Option<Address>,
}
#[derive(Clone, Debug)]
pub struct EncryptedDataRefRepository {
db: DatabaseManager,
}
impl EncryptedDataRefRepository {
pub fn new(db: DatabaseManager) -> Self {
Self { db }
}
#[allow(clippy::too_many_arguments)]
pub async fn insert(
&self,
sender_address: Address,
policy_client_address: Address,
envelope: &[u8],
signature: &[u8],
sender_pubkey: &[u8],
expires_at: Option<chrono::DateTime<chrono::Utc>>,
chain_id: u64,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO encrypted_data_refs
(data_type, chain_id, sender_address, policy_client_address, envelope, signature, recipient_pubkey, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id
"#,
)
.bind(DataType::Privacy.as_str())
.bind(chain_id as i64)
.bind(sender_address.as_slice())
.bind(policy_client_address.as_slice())
.bind(envelope)
.bind(signature)
.bind(sender_pubkey)
.bind(expires_at)
.fetch_one(self.db.pool())
.await?;
Ok(row.get("id"))
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_identity(
&self,
data_ref_id: &str,
identity_owner: Address,
identity_domain: FixedBytes<32>,
envelope: &[u8],
chain_id: u64,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO encrypted_data_refs
(data_type, chain_id, sender_address, policy_client_address, envelope,
data_ref_id, owner, domain, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (data_ref_id) WHERE data_type = 'identity' DO NOTHING
RETURNING id
"#,
)
.bind(DataType::Identity.as_str())
.bind(chain_id as i64)
.bind(Address::ZERO.as_slice())
.bind(Address::ZERO.as_slice())
.bind(envelope)
.bind(data_ref_id)
.bind(identity_owner.as_slice())
.bind(identity_domain.as_slice())
.bind(expires_at)
.fetch_optional(self.db.pool())
.await?;
match row {
Some(r) => Ok(r.get("id")),
None => {
let existing = sqlx::query(
r#"SELECT id FROM encrypted_data_refs WHERE data_ref_id = $1 AND data_type = 'identity'"#,
)
.bind(data_ref_id)
.fetch_one(self.db.pool())
.await?;
Ok(existing.get("id"))
}
}
}
pub async fn upsert_secrets(
&self,
policy_client_address: Address,
policy_data_address: Address,
envelope: &[u8],
chain_id: u64,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO encrypted_data_refs
(data_type, chain_id, sender_address, policy_client_address, envelope, policy_data_address)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (chain_id, policy_client_address, policy_data_address)
WHERE data_type = 'secrets'
DO UPDATE SET envelope = EXCLUDED.envelope
RETURNING id
"#,
)
.bind(DataType::Secrets.as_str())
.bind(chain_id as i64)
.bind(Address::ZERO.as_slice())
.bind(policy_client_address.as_slice())
.bind(envelope)
.bind(policy_data_address.as_slice())
.fetch_one(self.db.pool())
.await?;
Ok(row.get("id"))
}
pub async fn get_by_id(&self, id: Uuid) -> Result<Option<EncryptedDataRefRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE id = $1
AND (expires_at IS NULL OR expires_at > NOW())
"#,
)
.bind(id)
.fetch_optional(self.db.pool())
.await?;
Ok(row.map(|r| Self::row_to_record(&r)))
}
pub async fn get_by_ids(&self, ids: &[Uuid]) -> Result<Vec<EncryptedDataRefRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE id = ANY($1)
AND (expires_at IS NULL OR expires_at > NOW())
"#,
)
.bind(ids)
.fetch_all(self.db.pool())
.await?;
Ok(rows.iter().map(Self::row_to_record).collect())
}
pub async fn get_by_sender_and_policy_client(
&self,
sender_address: Address,
policy_client_address: Address,
chain_id: u64,
) -> Result<Vec<EncryptedDataRefRecord>, sqlx::Error> {
let rows = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE chain_id = $1 AND sender_address = $2 AND policy_client_address = $3
AND (expires_at IS NULL OR expires_at > NOW())
ORDER BY created_at DESC
"#,
)
.bind(chain_id as i64)
.bind(sender_address.as_slice())
.bind(policy_client_address.as_slice())
.fetch_all(self.db.pool())
.await?;
Ok(rows.iter().map(Self::row_to_record).collect())
}
pub async fn get_identity_by_ref_id(
&self,
data_ref_id: &str,
) -> Result<Option<EncryptedDataRefRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE data_ref_id = $1 AND data_type = 'identity'
"#,
)
.bind(data_ref_id)
.fetch_optional(self.db.pool())
.await?;
Ok(row.map(|r| Self::row_to_record(&r)))
}
#[allow(clippy::too_many_arguments)]
pub async fn insert_confidential(
&self,
data_ref_id: &str,
provider: Address,
domain: FixedBytes<32>,
envelope: &[u8],
chain_id: u64,
expires_at: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Uuid, sqlx::Error> {
let row = sqlx::query(
r#"
INSERT INTO encrypted_data_refs
(data_type, chain_id, sender_address, policy_client_address, envelope,
data_ref_id, owner, domain, expires_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (data_ref_id) WHERE data_type = 'confidential' DO NOTHING
RETURNING id
"#,
)
.bind(DataType::Confidential.as_str())
.bind(chain_id as i64)
.bind(Address::ZERO.as_slice())
.bind(Address::ZERO.as_slice())
.bind(envelope)
.bind(data_ref_id)
.bind(provider.as_slice())
.bind(domain.as_slice())
.bind(expires_at)
.fetch_optional(self.db.pool())
.await?;
match row {
Some(r) => Ok(r.get("id")),
None => {
let existing = sqlx::query(
r#"SELECT id FROM encrypted_data_refs WHERE data_ref_id = $1 AND data_type = 'confidential'"#,
)
.bind(data_ref_id)
.fetch_one(self.db.pool())
.await?;
Ok(existing.get("id"))
}
}
}
pub async fn get_by_ref_id(&self, data_ref_id: &str) -> Result<Option<EncryptedDataRefRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE data_ref_id = $1
"#,
)
.bind(data_ref_id)
.fetch_optional(self.db.pool())
.await?;
Ok(row.map(|r| Self::row_to_record(&r)))
}
pub async fn get_confidential_by_ref_id(
&self,
data_ref_id: &str,
) -> Result<Option<EncryptedDataRefRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE data_ref_id = $1 AND data_type = 'confidential'
"#,
)
.bind(data_ref_id)
.fetch_optional(self.db.pool())
.await?;
Ok(row.map(|r| Self::row_to_record(&r)))
}
pub async fn get_secrets_for_policy_client(
&self,
policy_client_address: Address,
policy_data_address: Address,
chain_id: u64,
) -> Result<Option<EncryptedDataRefRecord>, sqlx::Error> {
let row = sqlx::query(
r#"
SELECT id, data_type, chain_id, sender_address, policy_client_address, envelope,
signature, recipient_pubkey, created_at, expires_at,
data_ref_id, owner, domain, confirmed_at,
policy_data_address
FROM encrypted_data_refs
WHERE chain_id = $1
AND policy_client_address = $2
AND policy_data_address = $3
AND data_type = 'secrets'
"#,
)
.bind(chain_id as i64)
.bind(policy_client_address.as_slice())
.bind(policy_data_address.as_slice())
.fetch_optional(self.db.pool())
.await?;
Ok(row.map(|r| Self::row_to_record(&r)))
}
pub async fn confirm_identity(&self, data_ref_id: &str) -> Result<bool, sqlx::Error> {
let result = sqlx::query(
r#"
UPDATE encrypted_data_refs
SET confirmed_at = NOW()
WHERE data_ref_id = $1 AND data_type = 'identity' AND confirmed_at IS NULL
"#,
)
.bind(data_ref_id)
.execute(self.db.pool())
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn delete_expired(&self) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
r#"
DELETE FROM encrypted_data_refs
WHERE expires_at IS NOT NULL AND expires_at <= NOW()
"#,
)
.execute(self.db.pool())
.await?;
Ok(result.rows_affected())
}
fn bytes_to_address(b: &[u8]) -> Address {
if b.len() == 20 {
Address::from_slice(b)
} else {
Address::ZERO
}
}
fn row_to_record(r: &sqlx::postgres::PgRow) -> EncryptedDataRefRecord {
let sender_bytes: Vec<u8> = r.get("sender_address");
let policy_bytes: Vec<u8> = r.get("policy_client_address");
let chain_id_raw: i64 = r.get("chain_id");
let data_type_str: String = r.get("data_type");
let owner: Option<Address> = r.get::<Option<Vec<u8>>, _>("owner").map(|b| Self::bytes_to_address(&b));
let domain: Option<FixedBytes<32>> = r.get::<Option<Vec<u8>>, _>("domain").and_then(|b| {
if b.len() == 32 {
Some(FixedBytes::from_slice(&b))
} else {
None
}
});
let policy_data_address: Option<Address> = r
.get::<Option<Vec<u8>>, _>("policy_data_address")
.map(|b| Self::bytes_to_address(&b));
EncryptedDataRefRecord {
id: r.get("id"),
data_type: DataType::from_sql_str(&data_type_str).unwrap_or(DataType::Privacy),
chain_id: chain_id_raw as u64,
sender_address: Self::bytes_to_address(&sender_bytes),
policy_client_address: Self::bytes_to_address(&policy_bytes),
envelope: r.get("envelope"),
signature: r.get("signature"),
sender_pubkey: r.get("recipient_pubkey"),
created_at: r.get("created_at"),
expires_at: r.get("expires_at"),
domain,
owner,
data_ref_id: r.get("data_ref_id"),
confirmed_at: r.get("confirmed_at"),
policy_data_address,
}
}
}