1use crate::{
2 collections::map::HashMap, runtime, snapshot_state_observer, Applier, ApplierGuard,
3 ApplierHost, CommandQueue, Composer, CompositionPassDebugStats, ConcreteApplierHost,
4 DefaultScheduler, Key, NodeError, NodeId, RecomposeScope, RetentionPolicy, Runtime,
5 RuntimeHandle, ScopeId, SlotDebugSnapshot, SlotTable, SlotTableDebugStats, SlotsHost,
6 SnapshotStateObserver,
7};
8use std::rc::Rc;
9use std::sync::Arc;
10
11pub struct Composition<A: Applier + 'static> {
12 pub(crate) composer_state: Rc<crate::composer::ComposerRuntimeState>,
13 pub(crate) slots: Rc<SlotsHost>,
14 pub(crate) applier: Rc<ConcreteApplierHost<A>>,
15 pub(crate) runtime: Runtime,
16 pub(crate) observer: SnapshotStateObserver,
17 pub(crate) root: Option<NodeId>,
18 pub(crate) root_key: Option<Key>,
19 pub(crate) root_render_requested: bool,
20 pub(crate) last_pass_stats: CompositionPassDebugStats,
21}
22
23pub const ROOT_RENDER_REPLAY_LIMIT: usize = 100;
39
40impl<A: Applier + 'static> Composition<A> {
41 pub fn new(applier: A) -> Self {
42 Self::with_runtime(applier, Runtime::new(Arc::new(DefaultScheduler)))
43 }
44
45 pub fn with_runtime(applier: A, runtime: Runtime) -> Self {
46 let composer_state = Rc::new(crate::composer::ComposerRuntimeState::default());
47 let slots = Rc::new(SlotsHost::new(SlotTable::new()));
48 let applier = Rc::new(ConcreteApplierHost::new(applier));
49 let observer_handle = runtime.handle();
50 let observer = SnapshotStateObserver::new(move |callback| {
51 observer_handle.enqueue_ui_task(callback);
52 });
53 observer.start();
54 Self {
55 composer_state,
56 slots,
57 applier,
58 runtime,
59 observer,
60 root: None,
61 root_key: None,
62 root_render_requested: false,
63 last_pass_stats: CompositionPassDebugStats::default(),
64 }
65 }
66
67 pub fn root_key(&self) -> Option<Key> {
70 self.root_key
71 }
72
73 pub fn set_retention_policy(&self, policy: RetentionPolicy) {
74 self.composer_state.set_retention_policy(policy);
75 }
76
77 fn slots_host(&self) -> Rc<SlotsHost> {
78 Rc::clone(&self.slots)
79 }
80
81 fn applier_host(&self) -> Rc<dyn ApplierHost> {
82 self.applier.clone()
83 }
84
85 fn reset_last_pass_stats(&mut self) {
86 self.last_pass_stats = CompositionPassDebugStats::default();
87 }
88
89 fn maybe_dump_slot_table(&self, label: &str) {
90 if std::env::var_os("COMPOSE_DEBUG_SLOT_TABLE").is_none() {
91 return;
92 }
93 eprintln!(
94 "[COMPOSE_DEBUG_SLOT_TABLE] {label}\n{:#?}",
95 self.debug_slot_snapshot()
96 );
97 }
98
99 pub fn take_root_render_request(&mut self) -> bool {
100 std::mem::take(&mut self.root_render_requested)
101 }
102
103 pub fn request_root_render(&mut self) {
104 self.root_render_requested = true;
105 self.runtime.handle().schedule();
106 }
107
108 fn record_pass_stats(
109 &mut self,
110 commands: &CommandQueue,
111 side_effects: &Vec<Box<dyn FnOnce()>>,
112 ) {
113 self.last_pass_stats.commands_len = self.last_pass_stats.commands_len.max(commands.len());
114 self.last_pass_stats.commands_cap =
115 self.last_pass_stats.commands_cap.max(commands.capacity());
116 self.last_pass_stats.command_payload_len_bytes = self
117 .last_pass_stats
118 .command_payload_len_bytes
119 .max(commands.payload_len_bytes());
120 self.last_pass_stats.command_payload_cap_bytes = self
121 .last_pass_stats
122 .command_payload_cap_bytes
123 .max(commands.payload_capacity_bytes());
124 self.last_pass_stats.sync_children_len = self
125 .last_pass_stats
126 .sync_children_len
127 .max(commands.sync_children.len());
128 self.last_pass_stats.sync_children_cap = self
129 .last_pass_stats
130 .sync_children_cap
131 .max(commands.sync_children.capacity());
132 self.last_pass_stats.sync_child_ids_len = self
133 .last_pass_stats
134 .sync_child_ids_len
135 .max(commands.sync_child_ids.len());
136 self.last_pass_stats.sync_child_ids_cap = self
137 .last_pass_stats
138 .sync_child_ids_cap
139 .max(commands.sync_child_ids.capacity());
140 self.last_pass_stats.side_effects_len = self
141 .last_pass_stats
142 .side_effects_len
143 .max(side_effects.len());
144 self.last_pass_stats.side_effects_cap = self
145 .last_pass_stats
146 .side_effects_cap
147 .max(side_effects.capacity());
148 }
149
150 fn finalize_runtime_state(&mut self) {
151 let runtime_handle = self.runtime_handle();
152 self.observer.prune_dead_scopes();
153 if !self.runtime.has_updates()
154 && !runtime_handle.has_invalid_scopes()
155 && !runtime_handle.has_frame_callbacks()
156 && !runtime_handle.has_pending_ui()
157 {
158 self.runtime.set_needs_frame(false);
159 }
160 }
161
162 fn abandon_host_after_apply_failure(&mut self, host: &Rc<SlotsHost>) {
163 host.abandon_after_apply_failure();
164 if Rc::ptr_eq(host, &self.slots) {
165 self.root = None;
166 }
167 self.root_render_requested = true;
168 self.finalize_runtime_state();
169 }
170
171 fn apply_commands_and_updates_for_host(
172 &mut self,
173 host: &Rc<SlotsHost>,
174 runtime_handle: &RuntimeHandle,
175 commands: CommandQueue,
176 ) -> Result<(), NodeError> {
177 let result = {
178 let mut applier = self.applier.borrow_dyn();
179 let mut result = commands.apply(&mut *applier);
180 if result.is_ok() {
181 for update in runtime_handle.take_updates() {
182 if let Err(err) = update.apply(&mut *applier) {
183 result = Err(err);
184 break;
185 }
186 }
187 }
188 result
189 };
190 if result.is_err() {
191 self.abandon_host_after_apply_failure(host);
192 }
193 result
194 }
195
196 fn render_root_pass(&mut self, key: Key, content: &mut dyn FnMut()) -> Result<(), NodeError> {
197 self.root_key = Some(key);
198 self.root_render_requested = false;
199 let runtime_handle = self.runtime_handle();
200 runtime_handle.drain_ui();
201 let side_effects = {
202 let _teardown = runtime::enter_state_teardown_scope();
203 let composer = Composer::new_with_shared_state(
204 Rc::clone(&self.composer_state),
205 Rc::clone(&self.slots),
206 self.applier.clone(),
207 runtime_handle.clone(),
208 self.observer.clone(),
209 self.root,
210 );
211 self.observer.begin_frame();
212 let (root, commands, side_effects, compact_applier) = composer.install(|composer| {
213 let (_, outcome) = composer.try_with_slot_host_pass(
214 Rc::clone(&self.slots),
215 crate::slot::SlotPassMode::Compose,
216 |composer| composer.with_group(key, |_| content()),
217 )?;
218 let root = composer.root();
219 let commands = composer.take_commands();
220 let side_effects = composer.take_side_effects();
221 Ok((root, commands, side_effects, outcome.compacted))
222 })?;
223 self.record_pass_stats(&commands, &side_effects);
224 self.apply_commands_and_updates_for_host(
225 &Rc::clone(&self.slots),
226 &runtime_handle,
227 commands,
228 )?;
229 if compact_applier {
230 self.applier.compact();
231 self.applier.borrow_dyn().clear_recycled_nodes();
232 }
233
234 self.root = root;
235 side_effects
236 };
237 runtime_handle.drain_ui();
238 for effect in side_effects {
239 effect();
240 }
241 runtime_handle.drain_ui();
242 self.maybe_dump_slot_table("root_render_pass");
243 Ok(())
244 }
245
246 fn reconcile_with_content(
247 &mut self,
248 key: Key,
249 content: &mut dyn FnMut(),
250 ) -> Result<bool, NodeError> {
251 self.root_key = Some(key);
252 let mut did_work = false;
253 let mut root_render_replays = 0usize;
254 loop {
255 did_work |= self.process_invalid_scopes_until_root_request()?;
256 if !self.take_root_render_request() {
257 return Ok(did_work);
258 }
259
260 root_render_replays += 1;
261 if root_render_replays > ROOT_RENDER_REPLAY_LIMIT {
262 debug_assert!(
263 false,
264 "root render replay exceeded {ROOT_RENDER_REPLAY_LIMIT} iterations — reentrant render bug"
265 );
266 log::error!(
267 "root render replay looped past {ROOT_RENDER_REPLAY_LIMIT} iterations; breaking to keep UI responsive"
268 );
269 return Ok(true);
270 }
271
272 self.render_root_pass(key, content)?;
273 did_work = true;
274 }
275 }
276
277 pub fn render(&mut self, key: Key, mut content: impl FnMut()) -> Result<(), NodeError> {
278 self.reset_last_pass_stats();
279 self.render_root_pass(key, &mut content)?;
280 let _ = self.process_invalid_scopes()?;
281 Ok(())
282 }
283
284 pub fn render_stable(&mut self, key: Key, mut content: impl FnMut()) -> Result<(), NodeError> {
287 self.reset_last_pass_stats();
288 self.render_root_pass(key, &mut content)?;
289 let _ = self.reconcile_with_content(key, &mut content)?;
290 Ok(())
291 }
292
293 pub fn reconcile(&mut self, key: Key, mut content: impl FnMut()) -> Result<bool, NodeError> {
296 self.reconcile_with_content(key, &mut content)
297 }
298
299 pub fn should_render(&self) -> bool {
308 self.root_render_requested || self.runtime.needs_frame() || self.runtime.has_updates()
309 }
310
311 pub fn runtime_handle(&self) -> RuntimeHandle {
312 self.runtime.handle()
313 }
314
315 pub fn applier_mut(&mut self) -> ApplierGuard<'_, A> {
316 ApplierGuard::new(self.applier.borrow_typed())
317 }
318
319 pub fn root(&self) -> Option<NodeId> {
320 self.root
321 }
322
323 pub fn debug_dump_slot_table_groups(&self) -> Vec<(usize, Key, Option<ScopeId>, usize)> {
324 self.slots.borrow().debug_dump_groups()
325 }
326
327 pub fn debug_dump_slot_entries(&self) -> Vec<crate::SlotDebugEntry> {
328 self.slots.borrow().debug_dump_slot_entries()
329 }
330
331 pub fn slot_table_heap_bytes(&self) -> usize {
332 self.slots.borrow().heap_bytes()
333 }
334
335 pub fn debug_slot_table_stats(&self) -> SlotTableDebugStats {
336 self.slots.debug_stats()
337 }
338
339 pub fn debug_slot_snapshot(&self) -> SlotDebugSnapshot {
340 self.slots.debug_snapshot()
341 }
342
343 pub fn debug_observer_stats(&self) -> snapshot_state_observer::SnapshotStateObserverDebugStats {
344 self.observer.debug_stats()
345 }
346
347 pub fn debug_last_pass_stats(&self) -> CompositionPassDebugStats {
348 self.last_pass_stats
349 }
350
351 #[cfg(test)]
352 pub(crate) fn debug_validate_slots(&self) -> Result<(), crate::slot::SlotInvariantError> {
353 let table = self.slots.borrow();
354 table.validate()?;
355 self.composer_state
356 .validate_host_retention(self.slots.as_ref(), &table)
357 }
358
359 fn process_invalid_scopes_until_root_request(&mut self) -> Result<bool, NodeError> {
360 let runtime_handle = self.runtime_handle();
361 let mut did_recompose = false;
362 let mut loop_count = 0;
363 loop {
364 loop_count += 1;
365 if loop_count > ROOT_RENDER_REPLAY_LIMIT {
366 debug_assert!(
367 false,
368 "process_invalid_scopes exceeded {ROOT_RENDER_REPLAY_LIMIT} iterations — reentrant recomposition bug (a scope keeps re-invalidating)"
369 );
370 log::error!(
371 "process_invalid_scopes looped past {ROOT_RENDER_REPLAY_LIMIT} iterations; breaking to keep UI responsive"
372 );
373 break;
374 }
375 runtime_handle.drain_ui();
376 let pending = runtime_handle.take_invalidated_scopes();
377 if pending.is_empty() {
378 break;
379 }
380 let mut scopes = Vec::new();
381 for (id, weak) in pending {
382 if let Some(inner) = weak.upgrade() {
383 scopes.push(RecomposeScope { inner });
384 } else {
385 runtime_handle.mark_scope_recomposed(id);
386 }
387 }
388 if scopes.is_empty() {
389 continue;
390 }
391 did_recompose = true;
392 let runtime_clone = runtime_handle.clone();
393 let root_host = self.slots_host();
394 let mut scope_groups: Vec<(Rc<SlotsHost>, Vec<RecomposeScope>)> = Vec::new();
395 let mut scope_group_index: HashMap<usize, usize> = HashMap::default();
396 for scope in scopes {
397 let host = scope
398 .slots_runtime_state()
399 .and_then(|state| {
400 scope
401 .slots_storage_key()
402 .and_then(|storage_key| state.host_for_storage_key(storage_key))
403 })
404 .or_else(|| {
405 scope.slots_storage_key().and_then(|storage_key| {
406 self.composer_state.host_for_storage_key(storage_key)
407 })
408 })
409 .unwrap_or_else(|| Rc::clone(&root_host));
410 let host_key = host.storage_key();
411 if let Some(index) = scope_group_index.get(&host_key).copied() {
412 scope_groups[index].1.push(scope);
413 } else {
414 scope_group_index.insert(host_key, scope_groups.len());
415 scope_groups.push((host, vec![scope]));
416 }
417 }
418 let mut host_group_index = 0usize;
419 while host_group_index < scope_groups.len() {
420 let (host, scopes) = &scope_groups[host_group_index];
421 let shared_state = host
422 .runtime_state()
423 .or_else(|| scopes.first().and_then(RecomposeScope::slots_runtime_state))
424 .unwrap_or_else(|| Rc::clone(&self.composer_state));
425 let side_effects = {
426 let _teardown = runtime::enter_state_teardown_scope();
427 let composer = Composer::new_with_shared_state(
428 shared_state,
429 Rc::clone(host),
430 self.applier_host(),
431 runtime_clone.clone(),
432 self.observer.clone(),
433 self.root,
434 );
435 self.observer.begin_frame();
436 let (root, commands, side_effects, requested_root_render, compact_applier) =
437 composer.install(|composer| {
438 let (_, outcome) = composer.try_with_slot_host_pass(
439 Rc::clone(host),
440 crate::slot::SlotPassMode::Recompose,
441 |composer| {
442 for scope in scopes {
443 composer.recranpose_group(scope);
444 }
445 },
446 )?;
447 let root = composer.root();
448 let commands = composer.take_commands();
449 let side_effects = composer.take_side_effects();
450 let requested_root_render = composer.take_root_render_request();
451 Ok((
452 root,
453 commands,
454 side_effects,
455 requested_root_render,
456 outcome.compacted,
457 ))
458 })?;
459 self.record_pass_stats(&commands, &side_effects);
460 self.apply_commands_and_updates_for_host(host, &runtime_handle, commands)?;
461 if compact_applier {
462 self.applier.compact();
463 self.applier.borrow_dyn().clear_recycled_nodes();
464 }
465 if root.is_some() {
466 self.root = root;
467 }
468 if requested_root_render {
469 self.root_render_requested = true;
470 }
471 side_effects
472 };
473 runtime_handle.drain_ui();
474 for effect in side_effects {
475 effect();
476 }
477 runtime_handle.drain_ui();
478 self.maybe_dump_slot_table("recompose_pass");
479 if self.root_render_requested {
480 for (_, remaining_scopes) in scope_groups.iter().skip(host_group_index + 1) {
481 for scope in remaining_scopes {
482 runtime_handle.requeue_invalid_scope(scope.id(), scope.downgrade());
483 }
484 }
485 break;
486 }
487 host_group_index += 1;
488 }
489 if self.root_render_requested {
490 break;
491 }
492 }
493 self.finalize_runtime_state();
494 Ok(did_recompose)
495 }
496
497 pub fn process_invalid_scopes(&mut self) -> Result<bool, NodeError> {
498 self.process_invalid_scopes_until_root_request()
499 }
500
501 pub fn flush_pending_node_updates(&mut self) -> Result<(), NodeError> {
502 let updates = self.runtime_handle().take_updates();
503 let mut applier = self.applier.borrow_dyn();
504 for update in updates {
505 update.apply(&mut *applier)?;
506 }
507 Ok(())
508 }
509}
510
511impl<A: Applier + 'static> Drop for Composition<A> {
512 fn drop(&mut self) {
513 self.observer.stop();
514 }
515}