1use crate::{
2 collections::map::HashMap, debug_scope_invalidation_sources, debug_scope_label, runtime,
3 snapshot_state_observer, Applier, ApplierGuard, ApplierHost, CommandQueue, Composer,
4 CompositionPassDebugStats, ConcreteApplierHost, DefaultScheduler, Key, NodeError, NodeId,
5 RecomposeScope, RetentionPolicy, Runtime, RuntimeHandle, ScopeId, SlotDebugSnapshot, SlotTable,
6 SlotTableDebugStats, SlotsHost, SnapshotStateObserver,
7};
8use std::rc::Rc;
9use std::sync::Arc;
10use std::time::Instant;
11
12pub struct Composition<A: Applier + 'static> {
13 pub(crate) composer_state: Rc<crate::composer::ComposerRuntimeState>,
14 pub(crate) slots: Rc<SlotsHost>,
15 pub(crate) applier: Rc<ConcreteApplierHost<A>>,
16 pub(crate) runtime: Runtime,
17 pub(crate) observer: SnapshotStateObserver,
18 pub(crate) root: Option<NodeId>,
19 pub(crate) root_key: Option<Key>,
20 pub(crate) root_render_requested: bool,
21 pub(crate) last_pass_stats: CompositionPassDebugStats,
22}
23
24pub const ROOT_RENDER_REPLAY_LIMIT: usize = 100;
40
41fn recompose_scope_telemetry_threshold_ms() -> Option<f64> {
42 std::env::var("CRANPOSE_RECOMPOSE_SCOPE_TELEMETRY_MS")
43 .ok()
44 .and_then(|value| value.parse::<f64>().ok())
45 .filter(|value| value.is_finite() && *value >= 0.0)
46}
47
48impl<A: Applier + 'static> Composition<A> {
49 pub fn new(applier: A) -> Self {
50 Self::with_runtime(applier, Runtime::new(Arc::new(DefaultScheduler)))
51 }
52
53 pub fn with_runtime(applier: A, runtime: Runtime) -> Self {
54 let composer_state = Rc::new(crate::composer::ComposerRuntimeState::default());
55 let slots = Rc::new(SlotsHost::new(SlotTable::new()));
56 let applier = Rc::new(ConcreteApplierHost::new(applier));
57 let observer_handle = runtime.handle();
58 let observer = SnapshotStateObserver::new(move |callback| {
59 observer_handle.enqueue_ui_task(callback);
60 });
61 observer.start();
62 Self {
63 composer_state,
64 slots,
65 applier,
66 runtime,
67 observer,
68 root: None,
69 root_key: None,
70 root_render_requested: false,
71 last_pass_stats: CompositionPassDebugStats::default(),
72 }
73 }
74
75 pub fn root_key(&self) -> Option<Key> {
78 self.root_key
79 }
80
81 pub fn set_retention_policy(&self, policy: RetentionPolicy) {
82 self.composer_state.set_retention_policy(policy);
83 }
84
85 fn slots_host(&self) -> Rc<SlotsHost> {
86 Rc::clone(&self.slots)
87 }
88
89 fn applier_host(&self) -> Rc<dyn ApplierHost> {
90 self.applier.clone()
91 }
92
93 fn reset_last_pass_stats(&mut self) {
94 self.last_pass_stats = CompositionPassDebugStats::default();
95 }
96
97 fn maybe_dump_slot_table(&self, label: &str) {
98 if std::env::var_os("COMPOSE_DEBUG_SLOT_TABLE").is_none() {
99 return;
100 }
101 eprintln!(
102 "[COMPOSE_DEBUG_SLOT_TABLE] {label}\n{:#?}",
103 self.debug_slot_snapshot()
104 );
105 }
106
107 pub fn take_root_render_request(&mut self) -> bool {
108 std::mem::take(&mut self.root_render_requested)
109 }
110
111 pub fn request_root_render(&mut self) {
112 self.root_render_requested = true;
113 self.runtime.handle().schedule();
114 }
115
116 fn record_pass_stats(
117 &mut self,
118 commands: &CommandQueue,
119 side_effects: &Vec<Box<dyn FnOnce()>>,
120 ) {
121 self.last_pass_stats.commands_len = self.last_pass_stats.commands_len.max(commands.len());
122 self.last_pass_stats.commands_cap =
123 self.last_pass_stats.commands_cap.max(commands.capacity());
124 self.last_pass_stats.command_payload_len_bytes = self
125 .last_pass_stats
126 .command_payload_len_bytes
127 .max(commands.payload_len_bytes());
128 self.last_pass_stats.command_payload_cap_bytes = self
129 .last_pass_stats
130 .command_payload_cap_bytes
131 .max(commands.payload_capacity_bytes());
132 self.last_pass_stats.sync_children_len = self
133 .last_pass_stats
134 .sync_children_len
135 .max(commands.sync_children.len());
136 self.last_pass_stats.sync_children_cap = self
137 .last_pass_stats
138 .sync_children_cap
139 .max(commands.sync_children.capacity());
140 self.last_pass_stats.sync_child_ids_len = self
141 .last_pass_stats
142 .sync_child_ids_len
143 .max(commands.sync_child_ids.len());
144 self.last_pass_stats.sync_child_ids_cap = self
145 .last_pass_stats
146 .sync_child_ids_cap
147 .max(commands.sync_child_ids.capacity());
148 self.last_pass_stats.side_effects_len = self
149 .last_pass_stats
150 .side_effects_len
151 .max(side_effects.len());
152 self.last_pass_stats.side_effects_cap = self
153 .last_pass_stats
154 .side_effects_cap
155 .max(side_effects.capacity());
156 }
157
158 fn finalize_runtime_state(&mut self) {
159 let runtime_handle = self.runtime_handle();
160 self.observer.prune_dead_scopes();
161 if !self.runtime.has_updates()
162 && !runtime_handle.has_invalid_scopes()
163 && !runtime_handle.has_frame_callbacks()
164 && !runtime_handle.has_pending_ui()
165 {
166 self.runtime.set_needs_frame(false);
167 }
168 }
169
170 fn abandon_host_after_apply_failure(&mut self, host: &Rc<SlotsHost>) {
171 host.abandon_after_apply_failure();
172 if Rc::ptr_eq(host, &self.slots) {
173 self.root = None;
174 }
175 self.root_render_requested = true;
176 self.finalize_runtime_state();
177 }
178
179 fn apply_commands_and_updates_for_host(
180 &mut self,
181 host: &Rc<SlotsHost>,
182 runtime_handle: &RuntimeHandle,
183 commands: CommandQueue,
184 ) -> Result<(), NodeError> {
185 let result = {
186 let mut applier = self.applier.borrow_dyn();
187 let mut result = commands.apply(&mut *applier);
188 if result.is_ok() {
189 for update in runtime_handle.take_updates() {
190 if let Err(err) = update.apply(&mut *applier) {
191 result = Err(err);
192 break;
193 }
194 }
195 }
196 result
197 };
198 if result.is_err() {
199 self.abandon_host_after_apply_failure(host);
200 }
201 result
202 }
203
204 fn render_root_pass(&mut self, key: Key, content: &mut dyn FnMut()) -> Result<(), NodeError> {
205 self.root_key = Some(key);
206 self.root_render_requested = false;
207 let runtime_handle = self.runtime_handle();
208 runtime_handle.drain_ui();
209 let side_effects = {
210 let _teardown = runtime::enter_state_teardown_scope();
211 let composer = Composer::new_with_shared_state(
212 Rc::clone(&self.composer_state),
213 Rc::clone(&self.slots),
214 self.applier.clone(),
215 runtime_handle.clone(),
216 self.observer.clone(),
217 self.root,
218 );
219 self.observer.begin_frame();
220 let (root, commands, side_effects, compact_applier) = composer.install(|composer| {
221 let (_, outcome) = composer.try_with_slot_host_pass(
222 Rc::clone(&self.slots),
223 crate::slot::SlotPassMode::Compose,
224 |composer| composer.with_group(key, |_| content()),
225 )?;
226 let root = composer.root();
227 let commands = composer.take_commands();
228 let side_effects = composer.take_side_effects();
229 Ok((root, commands, side_effects, outcome.compacted))
230 })?;
231 self.record_pass_stats(&commands, &side_effects);
232 self.apply_commands_and_updates_for_host(
233 &Rc::clone(&self.slots),
234 &runtime_handle,
235 commands,
236 )?;
237 if compact_applier {
238 self.applier.compact();
239 self.applier.borrow_dyn().clear_recycled_nodes();
240 }
241
242 self.root = root;
243 side_effects
244 };
245 runtime_handle.drain_ui();
246 for effect in side_effects {
247 effect();
248 }
249 runtime_handle.drain_ui();
250 self.maybe_dump_slot_table("root_render_pass");
251 Ok(())
252 }
253
254 fn reconcile_with_content(
255 &mut self,
256 key: Key,
257 content: &mut dyn FnMut(),
258 ) -> Result<bool, NodeError> {
259 self.root_key = Some(key);
260 let mut did_work = false;
261 let mut root_render_replays = 0usize;
262 loop {
263 did_work |= self.process_invalid_scopes_until_root_request()?;
264 if !self.take_root_render_request() {
265 return Ok(did_work);
266 }
267
268 root_render_replays += 1;
269 if root_render_replays > ROOT_RENDER_REPLAY_LIMIT {
270 log::error!(
271 "root render replay looped past {ROOT_RENDER_REPLAY_LIMIT} iterations; breaking to keep UI responsive"
272 );
273 return Err(NodeError::RecompositionLimitExceeded {
274 operation: "root render replay",
275 limit: ROOT_RENDER_REPLAY_LIMIT,
276 });
277 }
278
279 self.render_root_pass(key, content)?;
280 did_work = true;
281 }
282 }
283
284 pub fn render(&mut self, key: Key, mut content: impl FnMut()) -> Result<(), NodeError> {
285 self.reset_last_pass_stats();
286 self.render_root_pass(key, &mut content)?;
287 let _ = self.process_invalid_scopes()?;
288 Ok(())
289 }
290
291 pub fn render_stable(&mut self, key: Key, mut content: impl FnMut()) -> Result<(), NodeError> {
294 self.reset_last_pass_stats();
295 self.render_root_pass(key, &mut content)?;
296 let _ = self.reconcile_with_content(key, &mut content)?;
297 Ok(())
298 }
299
300 pub fn reconcile(&mut self, key: Key, mut content: impl FnMut()) -> Result<bool, NodeError> {
303 self.reconcile_with_content(key, &mut content)
304 }
305
306 pub fn should_render(&self) -> bool {
315 self.root_render_requested || self.runtime.needs_frame() || self.runtime.has_updates()
316 }
317
318 pub fn runtime_handle(&self) -> RuntimeHandle {
319 self.runtime.handle()
320 }
321
322 pub fn applier_mut(&mut self) -> ApplierGuard<'_, A> {
323 ApplierGuard::new(self.applier.borrow_typed())
324 }
325
326 pub fn root(&self) -> Option<NodeId> {
327 self.root
328 }
329
330 pub fn debug_dump_slot_table_groups(&self) -> Vec<(usize, Key, Option<ScopeId>, usize)> {
331 self.slots.borrow().debug_dump_groups()
332 }
333
334 pub fn debug_dump_slot_entries(&self) -> Vec<crate::SlotDebugEntry> {
335 self.slots.borrow().debug_dump_slot_entries()
336 }
337
338 pub fn slot_table_heap_bytes(&self) -> usize {
339 self.slots.borrow().heap_bytes()
340 }
341
342 pub fn debug_slot_table_stats(&self) -> SlotTableDebugStats {
343 self.slots.debug_stats()
344 }
345
346 pub fn debug_slot_snapshot(&self) -> SlotDebugSnapshot {
347 self.slots.debug_snapshot()
348 }
349
350 pub fn debug_observer_stats(&self) -> snapshot_state_observer::SnapshotStateObserverDebugStats {
351 self.observer.debug_stats()
352 }
353
354 pub fn debug_last_pass_stats(&self) -> CompositionPassDebugStats {
355 self.last_pass_stats
356 }
357
358 #[cfg(test)]
359 pub(crate) fn debug_validate_slots(&self) -> Result<(), crate::slot::SlotInvariantError> {
360 let table = self.slots.borrow();
361 table.validate()?;
362 self.composer_state
363 .validate_host_retention(self.slots.as_ref(), &table)
364 }
365
366 fn process_invalid_scopes_until_root_request(&mut self) -> Result<bool, NodeError> {
367 let runtime_handle = self.runtime_handle();
368 let mut did_recompose = false;
369 let mut loop_count = 0;
370 loop {
371 loop_count += 1;
372 if loop_count > ROOT_RENDER_REPLAY_LIMIT {
373 log::error!(
374 "process_invalid_scopes looped past {ROOT_RENDER_REPLAY_LIMIT} iterations; breaking to keep UI responsive"
375 );
376 return Err(NodeError::RecompositionLimitExceeded {
377 operation: "process_invalid_scopes",
378 limit: ROOT_RENDER_REPLAY_LIMIT,
379 });
380 }
381 runtime_handle.drain_ui();
382 let pending = runtime_handle.take_invalidated_scopes();
383 if pending.is_empty() {
384 break;
385 }
386 let mut scopes = Vec::new();
387 for (id, weak) in pending {
388 if let Some(inner) = weak.upgrade() {
389 scopes.push(RecomposeScope { inner });
390 } else {
391 runtime_handle.mark_scope_recomposed(id);
392 }
393 }
394 if scopes.is_empty() {
395 continue;
396 }
397 did_recompose = true;
398 let runtime_clone = runtime_handle.clone();
399 let root_host = self.slots_host();
400 let mut scope_groups: Vec<(Rc<SlotsHost>, Vec<RecomposeScope>)> = Vec::new();
401 let mut scope_group_index: HashMap<usize, usize> = HashMap::default();
402 for scope in scopes {
403 let host = scope
404 .slots_runtime_state()
405 .and_then(|state| {
406 scope
407 .slots_storage_key()
408 .and_then(|storage_key| state.host_for_storage_key(storage_key))
409 })
410 .or_else(|| {
411 scope.slots_storage_key().and_then(|storage_key| {
412 self.composer_state.host_for_storage_key(storage_key)
413 })
414 })
415 .unwrap_or_else(|| Rc::clone(&root_host));
416 let host_key = host.storage_key();
417 if let Some(index) = scope_group_index.get(&host_key).copied() {
418 scope_groups[index].1.push(scope);
419 } else {
420 scope_group_index.insert(host_key, scope_groups.len());
421 scope_groups.push((host, vec![scope]));
422 }
423 }
424 let mut host_group_index = 0usize;
425 while host_group_index < scope_groups.len() {
426 let (host, scopes) = &scope_groups[host_group_index];
427 let scope_telemetry_threshold_ms = recompose_scope_telemetry_threshold_ms();
428 let shared_state = host
429 .runtime_state()
430 .or_else(|| scopes.first().and_then(RecomposeScope::slots_runtime_state))
431 .unwrap_or_else(|| Rc::clone(&self.composer_state));
432 let side_effects = {
433 let _teardown = runtime::enter_state_teardown_scope();
434 let composer = Composer::new_with_shared_state(
435 shared_state,
436 Rc::clone(host),
437 self.applier_host(),
438 runtime_clone.clone(),
439 self.observer.clone(),
440 self.root,
441 );
442 composer.parent_stack().clear();
445 self.observer.begin_frame();
446 let (root, commands, side_effects, requested_root_render, compact_applier) =
447 composer.install(|composer| {
448 let (_, outcome) = composer.try_with_slot_host_pass(
449 Rc::clone(host),
450 crate::slot::SlotPassMode::Recompose,
451 |composer| {
452 for scope in scopes {
453 if let Some(threshold_ms) = scope_telemetry_threshold_ms {
454 let start = Instant::now();
455 composer.recranpose_group(scope);
456 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
457 if elapsed_ms >= threshold_ms {
458 eprintln!(
459 "[recompose-scope-telemetry] scope_id={} label={:?} elapsed_ms={elapsed_ms:.3} invalidation_sources={:?}",
460 scope.id(),
461 debug_scope_label(scope.id()),
462 debug_scope_invalidation_sources(scope.id())
463 );
464 }
465 } else {
466 composer.recranpose_group(scope);
467 }
468 }
469 },
470 )?;
471 let root = composer.root();
472 let commands = composer.take_commands();
473 let side_effects = composer.take_side_effects();
474 let requested_root_render = composer.take_root_render_request();
475 Ok((
476 root,
477 commands,
478 side_effects,
479 requested_root_render,
480 outcome.compacted,
481 ))
482 })?;
483 self.record_pass_stats(&commands, &side_effects);
484 self.apply_commands_and_updates_for_host(host, &runtime_handle, commands)?;
485 if compact_applier {
486 self.applier.compact();
487 self.applier.borrow_dyn().clear_recycled_nodes();
488 }
489 if root.is_some() {
490 self.root = root;
491 }
492 if requested_root_render {
493 self.root_render_requested = true;
494 }
495 side_effects
496 };
497 runtime_handle.drain_ui();
498 for effect in side_effects {
499 effect();
500 }
501 runtime_handle.drain_ui();
502 self.maybe_dump_slot_table("recompose_pass");
503 if self.root_render_requested {
504 for (_, remaining_scopes) in scope_groups.iter().skip(host_group_index + 1) {
505 for scope in remaining_scopes {
506 runtime_handle.requeue_invalid_scope(scope.id(), scope.downgrade());
507 }
508 }
509 break;
510 }
511 host_group_index += 1;
512 }
513 if self.root_render_requested {
514 break;
515 }
516 }
517 self.finalize_runtime_state();
518 Ok(did_recompose)
519 }
520
521 pub fn process_invalid_scopes(&mut self) -> Result<bool, NodeError> {
522 self.process_invalid_scopes_until_root_request()
523 }
524
525 pub fn flush_pending_node_updates(&mut self) -> Result<(), NodeError> {
526 let updates = self.runtime_handle().take_updates();
527 let mut applier = self.applier.borrow_dyn();
528 for update in updates {
529 update.apply(&mut *applier)?;
530 }
531 Ok(())
532 }
533}
534
535impl<A: Applier + 'static> Drop for Composition<A> {
536 fn drop(&mut self) {
537 self.observer.stop();
538 }
539}