1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// SPDX-License-Identifier: BUSL-1.1
//! Tenant data purge handler.
//!
//! Deletes ALL data for a tenant across every engine and cache on this
//! Data Plane core. Called via `MetaOp::PurgeTenant`.
//!
//! Purge order: persistent storage first (sparse, edges, inverted index),
//! then in-memory state (vectors, timeseries, KV, CRDT, caches).
//! Idempotent: safe to re-run after a crash (missing data is a no-op).
use tracing::{info, warn};
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::task::ExecutionTask;
use crate::types::TenantId;
impl CoreLoop {
/// Purge all data for a tenant across every engine and cache.
///
/// Dispatched by the Control Plane via `MetaOp::PurgeTenant` through the
/// SPSC bridge. Deletes are atomic per-engine and idempotent (safe to retry).
pub(in crate::data::executor) fn execute_purge_tenant(
&mut self,
task: &ExecutionTask,
tenant_id: u64,
) -> Response {
info!(core = self.core_id, tenant_id, "starting tenant purge");
let _prefix = format!("{tenant_id}:");
// 1. Sparse engine: documents + secondary indexes (persistent, redb).
let (docs, idxs) = match self.sparse.delete_all_for_tenant(tenant_id) {
Ok(counts) => counts,
Err(e) => {
warn!(tenant_id, error = %e, "sparse purge failed");
return self.response_error(
task,
ErrorCode::Internal {
detail: format!("sparse purge: {e}"),
},
);
}
};
// 2. Graph engine: edges in redb.
let edges = match self
.edge_store
.purge_tenant(crate::types::TenantId::new(tenant_id))
{
Ok(n) => n,
Err(e) => {
warn!(tenant_id, error = %e, "edge store purge failed");
0
}
};
// CSR in-memory index: drop the tenant's partition outright. O(1)
// structural deletion — no key-prefix scan needed.
self.csr
.drop_partition(crate::types::TenantId::new(tenant_id));
// Deleted-nodes tracker: drop the whole tenant bucket.
self.deleted_nodes
.remove(&crate::types::TenantId::new(tenant_id));
// 3. Inverted index (fulltext): postings + doc_lengths (persistent, redb).
let inv = match self
.inverted
.purge_tenant(crate::types::TenantId::new(tenant_id))
{
Ok(n) => n,
Err(e) => {
warn!(tenant_id, error = %e, "inverted index purge failed");
0
}
};
// 4. Vector engine: remove all collections for this tenant. O(1) structural
// deletion per entry — no key-prefix scan needed with tuple keys.
let vec_removed = {
let tid_key = TenantId::new(tenant_id);
let before = self.vector_collections.len();
self.vector_collections.retain(|(t, _), _| *t != tid_key);
self.vector_params.retain(|(t, _), _| *t != tid_key);
self.index_configs.retain(|(t, _), _| *t != tid_key);
self.ivf_indexes.retain(|(t, _), _| *t != tid_key);
before - self.vector_collections.len()
};
// 5. Timeseries: memtables + partition registries.
let ts_removed = {
let tid_key = TenantId::new(tenant_id);
let before = self.columnar_memtables.len();
self.columnar_memtables.retain(|(t, _), _| *t != tid_key);
self.columnar_memtable_mem.retain(|(t, _), _| *t != tid_key);
self.ts_registries.retain(|(t, _), _| *t != tid_key);
self.ts_max_ingested_lsn.retain(|(t, _), _| *t != tid_key);
self.ts_last_value_caches.retain(|(t, _), _| *t != tid_key);
before - self.columnar_memtables.len()
};
// 6. KV engine: remove all tenant hash tables.
let kv_removed = self.kv_engine.purge_tenant(tenant_id);
// 7. CRDT engine: remove tenant state.
let crdt_removed = u32::from(
self.crdt_engines
.remove(&TenantId::new(tenant_id))
.is_some(),
);
// 8. Spatial indexes: remove tenant-scoped entries.
let tid_key = TenantId::new(tenant_id);
let spatial_removed = {
let before = self.spatial_indexes.len();
self.spatial_indexes.retain(|(t, _, _), _| *t != tid_key);
self.spatial_doc_map.retain(|(t, _, _, _), _| *t != tid_key);
before - self.spatial_indexes.len()
};
// 9. Caches: evict all tenant data.
self.doc_cache
.evict_tenant(task.request.database_id.as_u64(), tenant_id);
self.aggregate_cache.retain(|(t, _), _| *t != tid_key);
// 10. Doc configs: remove collection configs for this tenant.
self.doc_configs.retain(|(t, _), _| *t != tid_key);
// Chain hashes: remove for this tenant.
self.chain_hashes.retain(|(t, _), _| *t != tid_key);
// Sparse vector indexes: remove for this tenant.
self.sparse_vector_indexes
.retain(|(t, _, _), _| *t != tid_key);
// Columnar engines + flushed segments: remove for this tenant.
self.columnar_engines.retain(|(t, _), _| *t != tid_key);
self.columnar_flushed_segments
.retain(|(t, _), _| *t != tid_key);
info!(
core = self.core_id,
tenant_id,
docs,
idxs,
edges,
inv,
vec_removed,
ts_removed,
kv_removed,
crdt_removed,
spatial_removed,
"tenant purge complete"
);
let summary = serde_json::json!({
"tenant_id": tenant_id,
"documents_removed": docs,
"indexes_removed": idxs,
"edges_removed": edges,
"inverted_entries_removed": inv,
"vector_collections_removed": vec_removed,
"timeseries_collections_removed": ts_removed,
"kv_tables_removed": kv_removed,
"crdt_engines_removed": crdt_removed,
"spatial_indexes_removed": spatial_removed,
});
match crate::data::executor::response_codec::encode_json(&summary) {
Ok(payload) => self.response_with_payload(task, payload),
Err(_) => self.response_ok(task),
}
}
}