1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// SPDX-License-Identifier: BUSL-1.1
//! ChangeStream post-apply side effects — sync the in-memory
//! `stream_registry`, tear down the CDC router buffer on drop, and
//! cascade consumer-group state cleanup.
use std::sync::Arc;
use tracing::warn;
use crate::control::state::SharedState;
use crate::event::cdc::stream_def::ChangeStreamDef;
pub fn put(stored: ChangeStreamDef, shared: Arc<SharedState>) {
super::owner::install_from_parent(
"change_stream",
stored.tenant_id,
&stored.name,
&stored.owner,
&shared,
);
shared.stream_registry.register(stored);
}
pub fn delete(tenant_id: u64, name: String, shared: Arc<SharedState>) {
// 1. Drop the stream def from the in-memory registry so no new
// events are routed to it.
shared.stream_registry.unregister(tenant_id, &name);
// 2. Drop the per-stream retention buffer from the CDC router.
shared.cdc_router.remove_buffer(tenant_id, &name);
// 3. Cascade consumer-group teardown. Every group scoped to this
// stream must have its in-memory registry entry dropped AND
// its persisted offset state wiped — otherwise a `CREATE
// CHANGE STREAM` with the same name after a drop would
// resume from a stale consumer-group offset and silently
// skip real events.
let groups = shared.group_registry.list_for_stream(tenant_id, &name);
for def in &groups {
shared
.group_registry
.unregister(tenant_id, &name, &def.name);
if let Err(e) = shared
.offset_store
.delete_group(tenant_id, &name, &def.name)
{
warn!(
tenant = tenant_id,
stream = %name,
group = %def.name,
error = %e,
"failed to delete persisted consumer-group offsets on stream drop"
);
}
}
shared
.permissions
.install_replicated_remove_owner("change_stream", tenant_id, &name);
}