1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// Copyright 2020 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::models::StoredMessage;
use time::OffsetDateTime;
use tracing::debug;
#[derive(Clone)]
pub struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
/// offline and then loading it all at once when he comes back online.
retrieval_limit: i64,
}
impl InboxManager {
/// Creates new instance of the `InboxManager` with the provided sqlite connection pool.
///
/// # Arguments
///
/// * `connection_pool`: database connection pool to use.
pub(crate) fn new(connection_pool: sqlx::SqlitePool, mut retrieval_limit: i64) -> Self {
// TODO: make this into a hard error instead
if retrieval_limit == 0 {
retrieval_limit = 100;
}
InboxManager {
connection_pool,
retrieval_limit,
}
}
/// Inserts new message to the storage for an offline client for future retrieval.
///
/// # Arguments
///
/// * `client_address_bs58`: base58-encoded address of the client
/// * `content`: raw content of the message to store.
pub(crate) async fn insert_message(
&self,
client_address_bs58: &str,
content: Vec<u8>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO message_store(client_address_bs58, content) VALUES (?, ?)",
client_address_bs58,
content,
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
/// Retrieves messages stored for the particular client specified by the provided address.
///
/// It also respects the specified retrieval limit. If there are more messages stored than allowed
/// by the limit, it returns id of the last message retrieved to indicate start of the next query.
///
/// # Arguments
///
/// * `client_address_bs58`: base58-encoded address of the client
/// * `start_after`: optional starting id of the messages to grab
///
/// returns the retrieved messages alongside optional id of the last message retrieved if
/// there are more messages to retrieve.
pub(crate) async fn get_messages(
&self,
client_address_bs58: &str,
start_after: Option<i64>,
) -> Result<(Vec<StoredMessage>, Option<i64>), sqlx::Error> {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let start_after = start_after.unwrap_or(-1);
let mut res: Vec<StoredMessage> = sqlx::query_as(
r#"
SELECT id, client_address_bs58, content, timestamp
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
)
.bind(client_address_bs58)
.bind(start_after)
.bind(limit)
.fetch_all(&self.connection_pool)
.await?;
if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
// given retrieval_limit > 0, unwrap will not fail
#[allow(clippy::unwrap_used)]
let start_after = res.last().unwrap().id;
Ok((res, Some(start_after)))
//
} else {
Ok((res, None))
}
}
/// Removes message with the specified id
///
/// # Arguments
///
/// * `id`: id of the message to remove
pub(crate) async fn remove_message(&self, id: i64) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM message_store WHERE id = ?", id)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub(crate) async fn remove_messages_for_client(
&self,
client_address_bs58: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM message_store WHERE client_address_bs58 = ?",
client_address_bs58
)
.execute(&self.connection_pool)
.await?;
Ok(())
}
pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
.execute(&self.connection_pool)
.await?
.rows_affected();
debug!("Removed {affected} stale messages");
Ok(())
}
}