1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
use std::cmp::min;
use borderless::{
__private::storage_keys::{StorageKey, BASE_KEY_LOGS},
http::{queries::Pagination, PaginatedElements},
log::{LogLevel, LogLine},
prelude::Id,
};
use borderless_kv_store::*;
use serde::{Deserialize, Serialize};
use crate::log_shim::{debug, error, info, trace, warn};
use crate::{Result, AGENT_SUB_DB, CONTRACT_SUB_DB};
/// Storage key, where the meta-information about the buffer is saved
const SUB_KEY_META: u64 = u64::MAX;
/// We keep a maximum of 32k log-lines ( which should be sufficient for debugging )
const MAX_LOG_BUFFER_SIZE: u64 = 32 * 1024;
#[derive(Serialize, Deserialize, Default)]
struct BufferMeta {
start: u64,
end: u64,
/// Absolute index at which the last flush started.
last_flush_start: u64,
/// Number of log lines flushed in the last flush.
last_flush_count: u64,
}
/// Logger instance that is created over a key-value storage for a given contract-id
///
/// The logger is essentially a ring-buffer with a fixed size, that uses a specific key-space.
pub struct Logger<'a, S: Db> {
db: &'a S,
id: Id,
}
impl<'a, S: Db> Logger<'a, S> {
pub fn new(db: &'a S, id: impl Into<Id>) -> Self {
Self { db, id: id.into() }
}
/// Flushes the given log lines into the ring-buffer.
///
/// This function writes a batch of log lines into the underlying key-value storage. It performs the following steps:
///
/// 1. Reads the current buffer metadata, which includes the logical start and end indices of the stored log lines.
/// 2. Determines if adding the new log lines would exceed the fixed capacity (`MAX_LOG_BUFFER_SIZE`). If so,
/// it advances the start index to overwrite the oldest entries.
/// 3. Records the flush metadata (`last_flush_start` and `last_flush_count`) to track the range of log lines added in this flush.
/// 4. Writes the new log lines to storage using modulo arithmetic to map the logical indices to physical storage keys.
/// 5. Updates and persists the modified metadata.
///
/// # Arguments
///
/// * `lines` - A slice of `LogLine` objects to be flushed into the buffer.
///
/// # Returns
///
/// * `Ok(())` if the flush is successful.
///
/// # Errors
///
/// Returns an error if any database operation fails or if serialization/deserialization of log lines or metadata fails.
pub fn flush_lines(
&self,
lines: &[LogLine],
db_ptr: &S::Handle,
txn: &mut <S as Db>::RwTx<'_>,
) -> Result<()> {
// Retrieve meta info, or initialize it if not present.
let meta_key = StorageKey::system_key(self.id, BASE_KEY_LOGS, SUB_KEY_META);
let mut meta = match txn.read(db_ptr, &meta_key)? {
Some(bytes) => postcard::from_bytes(bytes)?,
None => {
// Initialize with flush info set to 0.
let meta = BufferMeta::default();
let bytes = postcard::to_allocvec(&meta)?;
txn.write(db_ptr, &meta_key, &bytes)?;
meta
}
};
let new_line_count = lines.len() as u64;
let current_count = meta.end - meta.start;
// If adding new lines would overflow the ring buffer, adjust the start index.
if current_count + new_line_count > MAX_LOG_BUFFER_SIZE {
let drop_count = current_count + new_line_count - MAX_LOG_BUFFER_SIZE;
meta.start += drop_count;
}
// Record the flush information: where the flush starts and how many lines are flushed.
meta.last_flush_start = meta.end;
meta.last_flush_count = new_line_count;
// Write each new log line using modulo arithmetic to wrap-around.
for (i, line) in lines.iter().enumerate() {
let index = (meta.end + i as u64) % MAX_LOG_BUFFER_SIZE;
let key = StorageKey::system_key(self.id, BASE_KEY_LOGS, index);
let bytes = postcard::to_allocvec(line)?;
txn.write(db_ptr, &key, &bytes)?;
}
// Update meta with the new end.
meta.end += new_line_count;
let meta_bytes = postcard::to_allocvec(&meta)?;
txn.write(db_ptr, &meta_key, &meta_bytes)?;
Ok(())
}
/// Retrieves the full log from the buffer in chronological order.
pub fn get_full_log(&self) -> Result<Vec<LogLine>> {
self.get_log_lines(0, MAX_LOG_BUFFER_SIZE)
}
/// Retrieves a range of log lines from the buffer in chronological order.
///
/// # Arguments
///
/// * `start_offset` - The number of log lines to skip from the oldest entry.
/// * `count` - The maximum number of log lines to retrieve.
///
/// For example, to get the 100 oldest log lines, call with start_offset = 0 and count = 100.
pub fn get_log_lines(&self, start_offset: u64, count: u64) -> Result<Vec<LogLine>> {
let db_ptr = match self.id {
Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
};
let txn = self.db.begin_ro_txn()?;
let meta_key = StorageKey::system_key(self.id, BASE_KEY_LOGS, SUB_KEY_META);
// Read meta info; if missing, assume an empty buffer.
let meta = match txn.read(&db_ptr, &meta_key)? {
Some(bytes) => postcard::from_bytes(bytes)?,
None => BufferMeta::default(),
};
let total_count = meta.end - meta.start;
// If the requested start offset is beyond the current log count, return an empty Vec.
if start_offset >= total_count {
return Ok(Vec::new());
}
// Determine the absolute indices in the logical log buffer.
let range_start = meta.start + start_offset;
let range_end = min(range_start + count, meta.end);
let mut logs = Vec::new();
// Iterate over the specified range and fetch each log line.
for i in range_start..range_end {
// Compute the physical index using modulo arithmetic.
let index = i % MAX_LOG_BUFFER_SIZE;
let key = StorageKey::system_key(self.id, BASE_KEY_LOGS, index);
if let Some(bytes) = txn.read(&db_ptr, &key)? {
let log_line: LogLine = postcard::from_bytes(bytes)?;
logs.push(log_line);
}
}
Ok(logs)
}
/// Retrieves the log lines that were flushed in the last call to `flush_lines`.
pub fn get_last_log(&self) -> Result<Vec<LogLine>> {
let db_ptr = match self.id {
Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
};
let txn = self.db.begin_ro_txn()?;
let meta_key = StorageKey::system_key(self.id, BASE_KEY_LOGS, SUB_KEY_META);
let meta: BufferMeta = match txn.read(&db_ptr, &meta_key)? {
Some(bytes) => postcard::from_bytes(bytes)?,
None => return Ok(Vec::new()),
};
let mut logs = Vec::new();
let flush_start = meta.last_flush_start;
let flush_count = meta.last_flush_count;
// Iterate over the range corresponding to the last flush.
for i in flush_start..(flush_start + flush_count) {
// Compute the physical index using modulo arithmetic.
let index = i % MAX_LOG_BUFFER_SIZE;
let key = StorageKey::system_key(self.id, BASE_KEY_LOGS, index);
if let Some(bytes) = txn.read(&db_ptr, &key)? {
let log_line: LogLine = postcard::from_bytes(bytes)?;
logs.push(log_line);
}
}
Ok(logs)
}
/// Returns the total number of log lines ever flushed.
///
/// Note that this number is the absolute index of the last flushed log line,
/// so if logs have been overwritten in the ring-buffer, the current log count
/// (meta.end - meta.start) may be lower.
pub fn total_log_lines(&self) -> Result<u64> {
let db_ptr = match self.id {
Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
};
let txn = self.db.begin_ro_txn()?;
let meta_key = StorageKey::system_key(self.id, BASE_KEY_LOGS, SUB_KEY_META);
// If meta is missing, we assume no logs have been flushed yet.
let meta = match txn.read(&db_ptr, &meta_key)? {
Some(bytes) => postcard::from_bytes(bytes)?,
None => BufferMeta::default(),
};
Ok(meta.end)
}
// TODO: Add 'reverse' option
/// Retrieves log lines for the given page and the total number of pages.
pub fn get_logs_paginated(&self, pagination: Pagination) -> Result<PaginatedElements<LogLine>> {
let page = pagination.page as u64;
let per_page = pagination.per_page as u64;
let db_ptr = match self.id {
Id::Contract { .. } => self.db.open_sub_db(CONTRACT_SUB_DB)?,
Id::Agent { .. } => self.db.open_sub_db(AGENT_SUB_DB)?,
};
let txn = self.db.begin_ro_txn()?;
let meta_key = StorageKey::system_key(self.id, BASE_KEY_LOGS, SUB_KEY_META);
// Retrieve meta information. If not found, assume an empty buffer.
let meta = match txn.read(&db_ptr, &meta_key)? {
Some(bytes) => postcard::from_bytes(bytes)?,
None => BufferMeta {
start: 0,
end: 0,
last_flush_start: 0,
last_flush_count: 0,
},
};
// Calculate the total number of log lines currently in the ring-buffer.
let total_count = meta.end - meta.start;
// Calculate total pages using ceiling division.
let total_pages = if total_count == 0 {
0
} else {
(total_count + per_page - 1) / per_page
};
let total_elements = (total_pages * per_page) as usize;
// Calculate the logical start and end indices for the requested page.
let page_start = meta.start + page.saturating_sub(1) * per_page;
// If the start index is beyond the end of the stored logs, return an empty Vec.
if page_start >= meta.end {
return Ok(PaginatedElements {
elements: Vec::new(),
total_elements,
pagination,
});
}
let page_end = std::cmp::min(meta.start + page * per_page, meta.end);
// Retrieve the logs for the calculated range.
let mut logs = Vec::new();
for i in page_start..page_end {
// Map the logical index to the physical index in the ring-buffer.
let physical_index = i % MAX_LOG_BUFFER_SIZE;
let key = StorageKey::system_key(self.id, BASE_KEY_LOGS, physical_index);
if let Some(bytes) = txn.read(&db_ptr, &key)? {
let log_line: LogLine = postcard::from_bytes(bytes)?;
logs.push(log_line);
}
}
Ok(PaginatedElements {
elements: logs,
total_elements,
pagination,
})
}
}
/// Just prints a log line to stdout
///
/// Ignores the timestamp
pub fn print_log_line(line: LogLine) {
let msg = line.msg;
match line.level {
LogLevel::Trace => trace!("{msg}"),
LogLevel::Debug => debug!("{msg}"),
LogLevel::Info => info!("{msg}"),
LogLevel::Warn => warn!("{msg}"),
LogLevel::Error => error!("{msg}"),
}
}