use sqlx::{
database::HasArguments, migrate::Migrator, query::Query, Database, Decode, Encode, Type,
};
use self::private::Sealed;
mod private {
#[allow(unreachable_pub)]
pub trait Sealed {}
#[cfg(feature = "postgres")]
impl Sealed for sqlx::postgres::Postgres {}
#[cfg(feature = "sqlite")]
impl Sealed for sqlx::sqlite::Sqlite {}
}
pub trait SqlType<DB: Database>:
for<'a> Encode<'a, DB> + for<'a> Decode<'a, DB> + Type<DB>
{
}
impl<DB: Database, T> SqlType<DB> for T where
T: for<'a> Encode<'a, DB> + for<'a> Decode<'a, DB> + Type<DB>
{
}
pub trait BorrowedSqlType<'a, DB: Database>:
Encode<'a, DB> + Decode<'a, DB> + Type<DB> + 'a
{
}
impl<'a, DB: Database, T> BorrowedSqlType<'a, DB> for T where
T: Encode<'a, DB> + Decode<'a, DB> + Type<DB> + 'a
{
}
#[allow(single_use_lifetimes)]
#[doc(hidden)]
pub trait SupportedDatabase: Database + Sealed {
fn get_migrator() -> &'static Migrator;
fn kv_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_kv (kv_key, kv_value)
VALUES ($1, $2)
ON CONFLICT (kv_key) DO UPDATE SET kv_value = $2
"#,
)
}
fn kv_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT kv_value FROM statestore_kv WHERE kv_key = $1
"#,
)
}
fn media_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
UPDATE statestore_media
SET last_access = NOW()
WHERE media_url = $1
RETURNING media_data
"#,
)
}
fn media_insert_query_1<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_media (media_url, media_data, last_access)
VALUES ($1, $2, NOW())
ON CONFLICT (media_url) DO NOTHING
"#,
)
}
fn media_insert_query_2<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
DELETE FROM statestore_media
WHERE media_url NOT IN
(SELECT media_url FROM statestore_media
ORDER BY last_access DESC
LIMIT 100)
"#,
)
}
fn media_delete_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
DELETE FROM statestore_media
WHERE media_url = $1
"#,
)
}
#[must_use]
fn room_remove_queries<'q>() -> Vec<Query<'q, Self, <Self as HasArguments<'q>>::Arguments>> {
vec![
sqlx::query("DELETE FROM statestore_rooms WHERE room_id = $1"),
sqlx::query("DELETE FROM statestore_accountdata WHERE room_id = $1"),
sqlx::query("DELETE FROM statestore_members WHERE room_id = $1"),
sqlx::query("DELETE FROM statestore_state WHERE room_id = $1"),
sqlx::query("DELETE FROM statestore_receipts WHERE room_id = $1"),
]
}
fn account_data_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_accountdata
(room_id, event_type, account_data)
VALUES ($1, $2, $3)
ON CONFLICT(room_id, event_type) DO UPDATE SET account_data = $3
"#,
)
}
fn account_data_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT account_data FROM statestore_accountdata
WHERE room_id = $1 AND event_type = $2
"#,
)
}
fn presence_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_presence
(user_id, presence)
VALUES ($1, $2)
ON CONFLICT(user_id) DO UPDATE SET presence = $2
"#,
)
}
fn presence_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT presence FROM statestore_presence
WHERE user_id = $1
"#,
)
}
fn member_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_members
(room_id, user_id, is_partial, member_event, displayname, joined)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(room_id, user_id) DO UPDATE SET is_partial = $3, member_event = $4, displayname = $5, joined = $6
"#,
)
}
fn member_profile_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_members
(room_id, user_id, is_partial, user_profile)
VALUES ($1, $2, $3, $4)
ON CONFLICT(room_id, user_id) DO UPDATE SET user_profile = $4
"#,
)
}
fn state_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_state
(room_id, event_type, state_key, is_partial, state_event)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT(room_id, event_type, state_key) DO UPDATE SET is_partial = $4, state_event = $5
"#,
)
}
fn room_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_rooms
(room_id, is_partial, room_info)
VALUES ($1, $2, $3)
ON CONFLICT(room_id) DO UPDATE SET is_partial = $2, room_info = $3
"#,
)
}
fn receipt_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_receipts
(room_id, event_id, receipt_type, user_id, receipt)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT(room_id, receipt_type, user_id) DO UPDATE SET event_id = $2, receipt_type = $3, receipt = $5
"#,
)
}
fn state_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT state_event FROM statestore_state
WHERE room_id = $1 AND event_type = $2 AND state_key = $3 AND is_partial = '0'
"#,
)
}
fn states_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT state_event FROM statestore_state
WHERE room_id = $1 AND event_type = $2 AND is_partial = $3
"#,
)
}
fn profile_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT user_profile FROM statestore_members
WHERE room_id = $1 AND user_id = $2 AND user_profile IS NOT NULL
"#,
)
}
fn member_remove_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
DELETE FROM statestore_memberships
WHERE room_id = $1 AND user_id = $2
"#,
)
}
fn members_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT user_id FROM statestore_members
WHERE room_id = $1
"#,
)
}
fn members_load_query_with_join_status<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT user_id FROM statestore_members
WHERE room_id = $1 AND joined = $2
"#,
)
}
fn member_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT is_partial, member_event FROM statestore_members
WHERE room_id = $1 AND user_id = $2 AND member_event IS NOT NULL
"#,
)
}
fn room_info_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT room_info FROM statestore_rooms
WHERE is_partial = $1
"#,
)
}
fn users_with_display_name_load_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT user_id FROM statestore_members
WHERE room_id = $1 AND displayname = $2
"#,
)
}
fn receipt_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT event_id, receipt FROM statestore_receipts
WHERE room_id = $1 AND receipt_type = $2 AND user_id = $3
"#,
)
}
fn event_receipt_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT user_id, receipt FROM statestore_receipts
WHERE room_id = $1 AND receipt_type = $2 AND event_id = $3
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn session_store_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_session (sender_key, session_data)
VALUES ($1, $2)
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn olm_message_hash_store_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments>
{
sqlx::query(
r#"
INSERT INTO cryptostore_message_hash (sender_key, message_hash)
VALUES ($1, $2)
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn inbound_group_session_upsert_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_inbound_group_session
(room_id, sender_key, session_id, session_data)
VALUES ($1, $2, $3, $4)
ON CONFLICT (room_id, sender_key, session_id)
DO UPDATE SET session_data = $4
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn outbound_group_session_store_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_outbound_group_session (room_id, session_data)
VALUES ($1, $2)
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn gossip_request_store_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_gossip_request (recipient_id, request_id, info_key, sent_out, gossip_data)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (request_id)
DO UPDATE SET recipient_id = $1, info_key = $3, sent_out = $4, gossip_data = $5
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn identity_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_identity (user_id, identity_data)
VALUES ($1, $2)
ON CONFLICT (user_id) DO UPDATE SET identity_data = $2
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn device_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_device (user_id, device_id, device_info)
VALUES ($1, $2, $3)
ON CONFLICT (user_id, device_id) DO UPDATE SET device_info = $3
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn device_delete_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
DELETE FROM cryptostore_device
WHERE user_id = $1 AND device_id = $2
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn sessions_for_user_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT session_data FROM cryptostore_session
WHERE sender_key = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn inbound_group_session_fetch_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT session_data FROM cryptostore_inbound_group_session
WHERE room_id = $1 AND sender_key = $2 AND session_id = $3
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn inbound_group_sessions_fetch_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT session_data FROM cryptostore_inbound_group_session
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn outbound_group_session_load_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT session_data FROM cryptostore_outbound_group_session
WHERE room_id = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn tracked_user_upsert_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO cryptostore_tracked_user (user_id, tracked_user_data)
VALUES ($1, $2)
ON CONFLICT (user_id) DO UPDATE SET tracked_user_data = $2
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn device_fetch_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT device_info FROM cryptostore_device
WHERE user_id = $1 AND device_id = $2
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn devices_for_user_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT device_info FROM cryptostore_device
WHERE user_id = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn tracked_users_fetch_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT tracked_user_data FROM cryptostore_tracked_user
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn identity_fetch_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT identity_data FROM cryptostore_identity
WHERE user_id = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn message_known_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT 1 FROM cryptostore_message_hash
WHERE sender_key = $1 AND message_hash = $2
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn gossip_request_fetch_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT gossip_data FROM cryptostore_gossip_request
WHERE request_id = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn gossip_request_info_fetch_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT gossip_data FROM cryptostore_gossip_request
WHERE info_key = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn gossip_requests_sent_state_fetch_query<'q>(
) -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
SELECT gossip_data FROM cryptostore_gossip_request
WHERE sent_out = $1
"#,
)
}
#[cfg(feature = "e2e-encryption")]
fn gossip_request_delete_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
DELETE FROM cryptostore_gossip_request
WHERE request_id = $1
"#,
)
}
}
#[cfg(feature = "postgres")]
impl SupportedDatabase for sqlx::postgres::Postgres {
fn get_migrator() -> &'static Migrator {
&sqlx::migrate!("./migrations/postgres")
}
}
#[cfg(feature = "sqlite")]
impl SupportedDatabase for sqlx::sqlite::Sqlite {
fn get_migrator() -> &'static Migrator {
&sqlx::migrate!("./migrations/sqlite")
}
fn media_load_query<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
UPDATE statestore_media
SET last_access = datetime(CURRENT_TIMESTAMP, 'localtime')
WHERE media_url = $1
RETURNING media_data
"#,
)
}
fn media_insert_query_1<'q>() -> Query<'q, Self, <Self as HasArguments<'q>>::Arguments> {
sqlx::query(
r#"
INSERT INTO statestore_media (media_url, media_data, last_access)
VALUES ($1, $2, datetime(CURRENT_TIMESTAMP, 'localtime'))
ON CONFLICT (media_url) DO NOTHING
"#,
)
}
}