1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// SPDX-License-Identifier: BUSL-1.1
//! Persistent lockout-state operations for the system catalog.
//!
//! The `_system.lockout_state` table stores one row per username, keyed by the
//! username string and encoded as MessagePack. The in-memory
//! `LoginAttemptTracker` cache is rebuilt from this table on startup and kept
//! in sync on every failure and success event.
use super::types::{LOCKOUT_STATE, SystemCatalog, catalog_err};
/// Re-export the table definition so `system_catalog.rs` can open it during
/// catalog initialization without reaching into `types` directly.
pub(super) use super::types::LOCKOUT_STATE as LOCKOUT_STATE_TABLE;
/// Persisted form of a user's lockout state.
///
/// Timestamps are milliseconds since the Unix epoch. `last_failure_ip` is
/// stored as the canonical text form of the IP address so it is human-readable
/// in diagnostic queries. A `None` means no IP was available (e.g. in-process
/// auth paths).
#[derive(zerompk::ToMessagePack, zerompk::FromMessagePack, Debug, Clone)]
#[msgpack(map, allow_unknown_fields)]
pub struct StoredLockoutRecord {
/// Number of consecutive failed login attempts since the last success.
pub failed_count: u32,
/// Epoch millisecond at which the lockout expires. `0` means not locked.
pub locked_until_ms: u64,
/// Epoch millisecond of the most recent failure.
pub last_failure_ms: u64,
/// Canonical text form of the IP address of the most recent failure.
#[msgpack(default)]
pub last_failure_ip: Option<String>,
}
impl SystemCatalog {
/// Upsert a lockout record for `username`.
///
/// Replaces any existing row. Called on every failure increment and on
/// success reset.
pub fn put_lockout_record(
&self,
username: &str,
record: &StoredLockoutRecord,
) -> crate::Result<()> {
let bytes =
zerompk::to_msgpack_vec(record).map_err(|e| catalog_err("serialize lockout", e))?;
let write_txn = self
.db
.begin_write()
.map_err(|e| catalog_err("lockout write txn", e))?;
{
let mut table = write_txn
.open_table(LOCKOUT_STATE)
.map_err(|e| catalog_err("open lockout_state", e))?;
table
.insert(username, bytes.as_slice())
.map_err(|e| catalog_err("put lockout", e))?;
}
write_txn
.commit()
.map_err(|e| catalog_err("lockout commit", e))?;
Ok(())
}
/// Remove the lockout record for `username`.
///
/// Called when `record_login_success` clears a user whose state no longer
/// needs tracking (failed_count = 0, not locked).
pub fn delete_lockout_record(&self, username: &str) -> crate::Result<()> {
let write_txn = self
.db
.begin_write()
.map_err(|e| catalog_err("lockout write txn", e))?;
{
let mut table = write_txn
.open_table(LOCKOUT_STATE)
.map_err(|e| catalog_err("open lockout_state", e))?;
table
.remove(username)
.map_err(|e| catalog_err("delete lockout", e))?;
}
write_txn
.commit()
.map_err(|e| catalog_err("lockout commit", e))?;
Ok(())
}
/// Load all lockout records. Used to rebuild the in-memory cache on startup.
pub fn load_all_lockout_records(&self) -> crate::Result<Vec<(String, StoredLockoutRecord)>> {
let read_txn = self
.db
.begin_read()
.map_err(|e| catalog_err("lockout read txn", e))?;
let table = read_txn
.open_table(LOCKOUT_STATE)
.map_err(|e| catalog_err("open lockout_state", e))?;
let mut out = Vec::new();
for entry in table
.range::<&str>(..)
.map_err(|e| catalog_err("range lockout_state", e))?
{
let (key, value) = entry.map_err(|e| catalog_err("read lockout", e))?;
let record: StoredLockoutRecord = match zerompk::from_msgpack(value.value()) {
Ok(r) => r,
Err(e) => {
tracing::warn!(
username = key.value(),
error = %e,
"skipping unparseable lockout record"
);
continue;
}
};
out.push((key.value().to_string(), record));
}
Ok(out)
}
/// Remove all lockout records whose `failed_count == 0` and
/// `locked_until_ms` is at or before `cutoff_ms`.
///
/// "Expired and cleared" means the lockout window has passed and there are
/// no pending failures to remember. Active lockouts are always preserved.
pub fn gc_lockout_records(&self, cutoff_ms: u64) -> crate::Result<u64> {
// Phase 1: identify keys to remove.
let to_delete = {
let read_txn = self
.db
.begin_read()
.map_err(|e| catalog_err("lockout gc read txn", e))?;
let table = read_txn
.open_table(LOCKOUT_STATE)
.map_err(|e| catalog_err("open lockout_state gc", e))?;
let mut keys = Vec::new();
for entry in table
.range::<&str>(..)
.map_err(|e| catalog_err("range lockout_state gc", e))?
{
let (key, value) = entry.map_err(|e| catalog_err("read lockout gc", e))?;
let record: StoredLockoutRecord = match zerompk::from_msgpack(value.value()) {
Ok(r) => r,
Err(_) => continue,
};
// Prune only if cleared (no pending failures) and lock window expired.
if record.failed_count == 0 && record.locked_until_ms <= cutoff_ms {
keys.push(key.value().to_string());
}
}
keys
};
if to_delete.is_empty() {
return Ok(0);
}
// Phase 2: delete.
let write_txn = self
.db
.begin_write()
.map_err(|e| catalog_err("lockout gc write txn", e))?;
{
let mut table = write_txn
.open_table(LOCKOUT_STATE)
.map_err(|e| catalog_err("open lockout_state gc write", e))?;
for key in &to_delete {
table
.remove(key.as_str())
.map_err(|e| catalog_err("gc lockout remove", e))?;
}
}
write_txn
.commit()
.map_err(|e| catalog_err("lockout gc commit", e))?;
Ok(to_delete.len() as u64)
}
}