1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//! Owns event placement: splicing a new event onto the block whose leaf is
//! its parent (promoting `Standalone` → `Module` as needed), or — when
//! splicing isn't possible — walking ancestors to build a fresh chain up to
//! the policy's hop and size limits.
use nmp_core::substrate::{EventId, KernelEvent};
use crate::block::TimelineBlock;
use crate::pointer::ThreadPointer;
use crate::resolver::ParentResolver;
use super::collapse::{gap_between, root_id_mismatched};
use super::{GroupDelta, Grouper};
impl<R: ParentResolver> Grouper<R> {
pub(super) fn place_event(&mut self, event: &KernelEvent) -> Option<GroupDelta> {
if self.seen.contains(&event.id) {
return None;
}
let parent = self.resolver.parent(event);
let root_hint = self.resolver.root(event);
// Case A: parent is an Event in store → try to splice onto the block
// whose leaf is that parent (promoting Standalone → Module as
// needed). If extension would exceed `max_module_size`, fall through
// to Case B to spawn a new block.
if let Some(ThreadPointer::Event { id: parent_id, .. }) = &parent {
if !self.by_id.contains_key(parent_id) || self.orphaned.contains(parent_id) {
// Parent isn't placed yet (unknown locally, or buffered
// awaiting its own parent). Buffer this child too — it
// stitches in when the chain settles top-down.
self.orphans
.entry(parent_id.clone())
.or_default()
.insert(event.id.clone());
self.orphaned.insert(event.id.clone());
self.pending_ancestor_ids.insert(parent_id.clone());
return None;
}
if let Some(idx) = self.find_block_with_leaf(parent_id) {
let parent_kev = self.by_id.get(parent_id).cloned();
let extended =
self.try_extend_block(idx, event, parent_kev.as_ref(), root_hint.as_ref());
if extended {
self.seen.insert(event.id.clone());
self.orphaned.remove(&event.id);
self.pending_ancestor_ids.remove(&event.id);
return Some(GroupDelta::BlockReplaced(idx));
}
}
}
// Case B: build a fresh chain by walking ancestors.
let (chain, terminal_root, has_gap) = self.walk_chain(event, parent.as_ref(), root_hint);
for id in &chain {
self.seen.insert(id.clone());
self.orphaned.remove(id);
self.pending_ancestor_ids.remove(id);
}
// `walk_chain` always seeds the chain with `event.id`, so it is
// non-empty in practice. If that invariant is ever violated we
// degrade silently (skip placement) rather than panic across the
// public API boundary.
let block = match chain.as_slice() {
// A length-1 chain is still a reply when `terminal_root` is
// set (parent absent / leaf taken / max_module_size hit). Carry
// the root so the block is not mistaken for a thread root.
[_] => TimelineBlock::Standalone {
id: chain.into_iter().next()?,
root: terminal_root,
},
[] => return None,
_ => TimelineBlock::Module {
events: chain,
has_gap,
root: terminal_root,
},
};
self.blocks.insert(0, block);
Some(GroupDelta::BlockInserted(0))
}
/// Try to splice `event` onto the block at `idx` whose leaf is its
/// parent. Returns true on success (block in-place mutated); false when
/// `max_module_size` is exceeded (caller falls back to a fresh block).
fn try_extend_block(
&mut self,
idx: usize,
event: &KernelEvent,
parent_kev: Option<&KernelEvent>,
root_hint: Option<&ThreadPointer>,
) -> bool {
let max_size = self.policy.max_module_size as usize;
let gap_threshold = self.policy.max_lookback_gap_secs;
let leaf_gap = gap_between(parent_kev, Some(event), gap_threshold);
match &mut self.blocks[idx] {
TimelineBlock::Standalone { id: parent_id, .. } => {
if max_size < 2 {
return false;
}
let mismatched = root_id_mismatched(root_hint, parent_id.as_str());
let promoted = TimelineBlock::Module {
events: vec![parent_id.clone(), event.id.clone()],
has_gap: leaf_gap || mismatched,
root: root_hint.cloned(),
};
self.blocks[idx] = promoted;
true
}
TimelineBlock::Module {
events,
has_gap,
root,
} => {
if events.len() >= max_size {
return false;
}
events.push(event.id.clone());
*has_gap = *has_gap || leaf_gap;
if root.is_none() {
*root = root_hint.cloned();
}
// Mismatched root: chain top is not the declared root id.
// `events` was just pushed to above, so `first()` is `Some`
// in practice; the `if let` keeps a panic off the public
// API path if that ever stops holding.
if let Some(top) = events.first() {
if root_id_mismatched(root.as_ref(), top) {
*has_gap = true;
}
}
true
}
}
}
/// Find a block whose leaf (last event) equals `parent_id`. Walks both
/// Standalone and Module blocks.
fn find_block_with_leaf(&self, parent_id: &str) -> Option<usize> {
self.blocks.iter().position(|b| match b {
TimelineBlock::Standalone { id, .. } => id == parent_id,
TimelineBlock::Module { events, .. } => {
events.last().is_some_and(|leaf| leaf == parent_id)
}
})
}
/// Walk up to `max_ancestor_hops` from `event`. Returns the chain in
/// root-first order (oldest first), the terminal root pointer (if non-
/// Event), and whether a gap was detected.
fn walk_chain(
&mut self,
event: &KernelEvent,
initial_parent: Option<&ThreadPointer>,
root_hint: Option<ThreadPointer>,
) -> (Vec<EventId>, Option<ThreadPointer>, bool) {
let mut chain: Vec<EventId> = vec![event.id.clone()];
let mut has_gap = false;
let mut terminal_root: Option<ThreadPointer> = None;
let max_size = self.policy.max_module_size as usize;
let max_hops = self.policy.max_ancestor_hops as usize;
let mut cursor: Option<ThreadPointer> = initial_parent.cloned();
let mut hops_used = 0usize;
while let Some(ptr) = cursor.take() {
if hops_used >= max_hops {
if !matches!(ptr, ThreadPointer::Event { .. }) {
terminal_root = Some(ptr.clone());
} else if let ThreadPointer::Event { id, .. } = &ptr {
if !self.by_id.contains_key(id) {
has_gap = true;
self.pending_ancestor_ids.insert(id.clone());
}
}
break;
}
match ptr {
ThreadPointer::Event { id, .. } => {
if self.seen.contains(&id) || self.orphaned.contains(&id) {
// Parent already lives in another block, or it's
// itself buffered awaiting its own parent. Either
// way we do not steal it — adjacent-root collapse
// or top-down orphan replay will reconcile.
has_gap = true;
break;
}
let Some(parent_event) = self.by_id.get(&id).cloned() else {
has_gap = true;
self.pending_ancestor_ids.insert(id.clone());
break;
};
// `chain` is seeded non-empty and only ever grows, so
// `first()` is `Some` in practice. The `if let` keeps a
// panic off the public API path; the gap check is purely
// additive, so skipping it on an empty chain is safe.
if let Some(child_id) = chain.first() {
let child = self.by_id.get(child_id);
if gap_between(
Some(&parent_event),
child,
self.policy.max_lookback_gap_secs,
) {
has_gap = true;
}
}
chain.insert(0, id.clone());
if chain.len() >= max_size {
break;
}
cursor = self.resolver.parent(&parent_event);
hops_used += 1;
}
other => {
terminal_root = Some(other);
break;
}
}
}
// Mismatched-root detection: chain top is not the declared root id.
// `chain` is non-empty in practice; the `if let` keeps a panic off
// the public API path, and this diagnostic is purely additive.
if let Some(ThreadPointer::Event { id: rid, .. }) =
terminal_root.as_ref().or(root_hint.as_ref())
{
if let Some(top) = chain.first() {
if top != rid {
has_gap = true;
}
}
}
// Adopt root_hint when nothing terminal was hit (used purely for
// adjacent-block collapse).
if terminal_root.is_none() {
terminal_root = root_hint;
}
(chain, terminal_root, has_gap)
}
}