Skip to main content

myko/server/
relationship_manager.rs

1//! Cell-based RelationshipManager for handling entity relationship cascades.
2//!
3//! This module handles cascade operations based on relationships registered via
4//! `#[belongs_to]`, `#[owns_many]`, and `#[ensure_for]` attribute macros.
5//!
6//! Uses CellServerCtx for queries and event publishing, keeping this module
7//! decoupled from direct store and event processor access.
8//!
9//! # Relationship Types
10//!
11//! ## BelongsTo (Foreign Key)
12//!
13//! A child entity has a foreign key pointing to a parent. When the parent is deleted,
14//! all children with matching foreign keys are cascade-deleted.
15//!
16//! ```text
17//! use myko::prelude::*;
18//! use std::sync::Arc;
19//!
20//! #[myko_item]
21//! pub struct Scene {
22//!     pub name: String,
23//! }
24//!
25//! #[myko_item]
26//! pub struct Binding {
27//!     #[belongs_to(Scene)]
28//!     pub scope_id: Arc<str>,
29//! }
30//! ```
31//!
32//! ## OwnsMany (Parent has array of child IDs)
33//!
34//! A parent entity owns an array of child IDs. Deleting the parent deletes all children.
35//! Deleting a child removes its ID from the parent's array.
36//!
37//! ```text
38//! use myko::prelude::*;
39//! use std::sync::Arc;
40//!
41//! #[myko_item]
42//! pub struct BindingNode {
43//!     pub name: String,
44//! }
45//!
46//! #[myko_item]
47//! pub struct Scene {
48//!     #[owns_many(BindingNode)]
49//!     pub node_ids: Vec<Arc<str>>,
50//! }
51//! ```
52//!
53//! ## EnsureFor (Auto-create for combinations)
54//!
55//! Automatically create one entity for each combination of dependency entities.
56//!
57//! ```text
58//! use myko::prelude::*;
59//! use std::sync::Arc;
60//!
61//! #[myko_item]
62//! pub struct Session {
63//!     pub name: String,
64//! }
65//!
66//! #[myko_item]
67//! pub struct Bundle {
68//!     pub name: String,
69//! }
70//!
71//! #[myko_item]
72//! pub struct BundleStatus {
73//!     #[ensure_for(Session)]
74//!     pub session_id: Arc<str>,
75//!     #[ensure_for(Bundle)]
76//!     pub bundle_id: Arc<str>,
77//! }
78//! ```
79
80use std::{
81    collections::{BTreeSet, HashMap, HashSet},
82    sync::Arc,
83};
84
85use dashmap::DashMap;
86use hyphae::Gettable;
87use log::{debug, info, trace};
88
89use super::{CellServerCtx, persister::PersistError};
90use crate::{
91    core::item::AnyItem,
92    event::EventOptions,
93    relationship::{
94        ArrayExtractor, ArrayRemover, EnsureForDependency, EntityFactory, FkExtractor, Relation,
95        iter_relations,
96    },
97};
98
99/// Lookup info for BelongsTo cascades
100#[derive(Clone)]
101struct BelongsToLookup {
102    id: u64,
103    local_type: &'static str,
104    foreign_type: &'static str,
105    extract_fk: FkExtractor,
106}
107
108/// Lookup info for OwnsMany cascades
109#[derive(Clone)]
110struct OwnsManyLookup {
111    local_type: &'static str,
112    foreign_type: &'static str,
113    extract_ids: ArrayExtractor,
114    remove_id: ArrayRemover,
115}
116
117/// Lookup info for EnsureFor cascades
118#[derive(Clone)]
119struct EnsureForLookup {
120    local_type: &'static str,
121    dependencies: Vec<EnsureForDependency>,
122    make_entity: EntityFactory,
123}
124
125/// Cell-based RelationshipManager for handling entity relationship cascades.
126///
127/// This manager discovers relationships via [`inventory`] at initialization,
128/// builds lookup indexes for efficient cascade processing, and provides
129/// methods for processing events and establishing relations on startup.
130///
131/// Unlike the actor-based version, this implementation uses CellServerCtx
132/// for queries and event publishing, keeping it decoupled from direct
133/// store and event processor access.
134pub struct RelationshipManager {
135    /// BelongsTo relations indexed by foreign_type (the parent type)
136    /// When a parent is deleted, look up children to cascade delete
137    belongs_to_by_foreign: HashMap<&'static str, Vec<BelongsToLookup>>,
138
139    /// BelongsTo relations indexed by local_type (the child type)
140    /// Used for orphan cleanup on startup
141    belongs_to_by_local: HashMap<&'static str, Vec<BelongsToLookup>>,
142
143    /// OwnsMany relations indexed by local_type (the parent type)
144    /// When a parent is deleted, delete all owned children
145    owns_many_by_local: HashMap<&'static str, Vec<OwnsManyLookup>>,
146
147    /// OwnsMany relations indexed by foreign_type (the child type)
148    /// When a child is deleted, update parent arrays
149    owns_many_by_foreign: HashMap<&'static str, Vec<OwnsManyLookup>>,
150
151    /// EnsureFor relations indexed by their dependency types
152    /// When a dependency entity is created, ensure derived entities exist
153    ensure_for_by_dependency: HashMap<&'static str, Vec<EnsureForLookup>>,
154
155    /// Reverse belongs_to index: lookup_id -> parent_id -> child_ids
156    belongs_to_children_by_parent: DashMap<u64, DashMap<Arc<str>, BTreeSet<Arc<str>>>>,
157
158    /// Reverse belongs_to index: lookup_id -> child_id -> parent_id
159    belongs_to_parent_by_child: DashMap<u64, DashMap<Arc<str>, Arc<str>>>,
160}
161
162impl RelationshipManager {
163    /// Create a new RelationshipManager with lookup tables built from inventory.
164    pub fn new() -> Self {
165        trace!("RelationshipManager: Initializing from inventory");
166
167        let mut belongs_to_by_foreign: HashMap<&'static str, Vec<BelongsToLookup>> = HashMap::new();
168        let mut belongs_to_by_local: HashMap<&'static str, Vec<BelongsToLookup>> = HashMap::new();
169        let mut owns_many_by_local: HashMap<&'static str, Vec<OwnsManyLookup>> = HashMap::new();
170        let mut owns_many_by_foreign: HashMap<&'static str, Vec<OwnsManyLookup>> = HashMap::new();
171        let mut ensure_for_by_dependency: HashMap<&'static str, Vec<EnsureForLookup>> =
172            HashMap::new();
173
174        let mut next_belongs_to_id = 1u64;
175        for registration in iter_relations() {
176            match &registration.relation {
177                Relation::BelongsTo {
178                    local_type,
179                    foreign_type,
180                    extract_fk,
181                    ..
182                } => {
183                    trace!(
184                        "RelationshipManager: Registered BelongsTo {} -> {}",
185                        local_type, foreign_type
186                    );
187                    let lookup = BelongsToLookup {
188                        id: next_belongs_to_id,
189                        local_type,
190                        foreign_type,
191                        extract_fk: *extract_fk,
192                    };
193                    next_belongs_to_id += 1;
194                    belongs_to_by_foreign
195                        .entry(foreign_type)
196                        .or_default()
197                        .push(lookup.clone());
198                    belongs_to_by_local
199                        .entry(local_type)
200                        .or_default()
201                        .push(lookup);
202                }
203                Relation::OwnsMany {
204                    local_type,
205                    foreign_type,
206                    extract_ids,
207                    remove_id,
208                    ..
209                } => {
210                    trace!(
211                        "RelationshipManager: Registered OwnsMany {} ->> {}",
212                        local_type, foreign_type
213                    );
214                    let lookup = OwnsManyLookup {
215                        local_type,
216                        foreign_type,
217                        extract_ids: *extract_ids,
218                        remove_id: *remove_id,
219                    };
220                    owns_many_by_local
221                        .entry(local_type)
222                        .or_default()
223                        .push(lookup.clone());
224                    owns_many_by_foreign
225                        .entry(foreign_type)
226                        .or_default()
227                        .push(lookup);
228                }
229                Relation::EnsureFor {
230                    local_type,
231                    dependencies,
232                    make_entity,
233                    ..
234                } => {
235                    trace!(
236                        "RelationshipManager: Registered EnsureFor {} for {:?}",
237                        local_type,
238                        dependencies
239                            .iter()
240                            .map(|d| d.foreign_type)
241                            .collect::<Vec<_>>()
242                    );
243                    let deps: Vec<_> = dependencies.to_vec();
244
245                    // Index by each dependency type
246                    for dep in dependencies.iter() {
247                        ensure_for_by_dependency
248                            .entry(dep.foreign_type)
249                            .or_default()
250                            .push(EnsureForLookup {
251                                local_type,
252                                dependencies: deps.clone(),
253                                make_entity: *make_entity,
254                            });
255                    }
256                }
257            }
258        }
259
260        let relation_count =
261            belongs_to_by_foreign.len() + owns_many_by_local.len() + ensure_for_by_dependency.len();
262        trace!(
263            "RelationshipManager: {} relation types indexed",
264            relation_count
265        );
266
267        Self {
268            belongs_to_by_foreign,
269            belongs_to_by_local,
270            owns_many_by_local,
271            owns_many_by_foreign,
272            ensure_for_by_dependency,
273            belongs_to_children_by_parent: DashMap::new(),
274            belongs_to_parent_by_child: DashMap::new(),
275        }
276    }
277
278    /// Forward a SET event for relationship processing.
279    ///
280    /// Handles EnsureFor: when a dependency entity is created, ensures
281    /// all derived entities exist for all combinations.
282    pub fn forward_set(
283        &self,
284        item: Arc<dyn AnyItem>,
285        ctx: &CellServerCtx,
286    ) -> Result<(), PersistError> {
287        let item_type = item.entity_type();
288
289        if let Some(lookups) = self.belongs_to_by_local.get(item_type) {
290            for lookup in lookups {
291                self.index_belongs_to_child(lookup, &item);
292            }
293        }
294
295        // Handle EnsureFor (dependency created → ensure derived entities exist)
296        if self.ensure_for_by_dependency.contains_key(item_type) {
297            self.handle_ensure_for(&item, ctx)?;
298        }
299
300        Ok(())
301    }
302
303    /// Forward a DEL event for relationship processing.
304    ///
305    /// Handles:
306    /// - BelongsTo cascade deletes (parent deleted → delete children)
307    /// - OwnsMany parent deletes (parent deleted → delete owned children)
308    /// - OwnsMany child deletes (child deleted → update parent arrays)
309    pub fn forward_del(
310        &self,
311        item: Arc<dyn AnyItem>,
312        ctx: &CellServerCtx,
313    ) -> Result<(), PersistError> {
314        // Handle BelongsTo cascades (parent deleted → delete children)
315        self.handle_belongs_to_cascade(&item, ctx)?;
316
317        // Handle OwnsMany parent deleted → delete owned children
318        self.handle_owns_many_parent_delete(&item, ctx)?;
319
320        // Handle OwnsMany child deleted → update parent arrays
321        self.handle_owns_many_child_delete(&item, ctx)?;
322
323        if let Some(lookups) = self.belongs_to_by_local.get(item.entity_type()) {
324            for lookup in lookups {
325                self.remove_belongs_to_child(lookup, &item.id());
326            }
327        }
328
329        Ok(())
330    }
331
332    /// Forward a batch of DEL events for relationship processing.
333    ///
334    /// Items should all be the same entity type. This keeps cascade deletes grouped
335    /// so downstream stores and views can process one wider delete wave instead of
336    /// thousands of tiny per-parent cascades.
337    pub fn forward_del_batch(
338        &self,
339        items: &[Arc<dyn AnyItem>],
340        ctx: &CellServerCtx,
341    ) -> Result<(), PersistError> {
342        if items.is_empty() {
343            return Ok(());
344        }
345
346        self.handle_belongs_to_cascade_batch(items, ctx)?;
347        self.handle_owns_many_parent_delete_batch(items, ctx)?;
348
349        for item in items {
350            self.handle_owns_many_child_delete(item, ctx)?;
351
352            if let Some(lookups) = self.belongs_to_by_local.get(item.entity_type()) {
353                for lookup in lookups {
354                    self.remove_belongs_to_child(lookup, &item.id());
355                }
356            }
357        }
358
359        Ok(())
360    }
361
362    /// Establish relations on startup (called after durable backend catchup).
363    ///
364    /// This performs:
365    /// 1. BelongsTo orphan cleanup: Delete children pointing to non-existent parents
366    /// 2. OwnsMany orphan cleanup: Delete children not referenced by any parent
367    /// 3. EnsureFor initialization: Create missing entities for all dependency combinations
368    pub fn establish_relations(&self, ctx: &CellServerCtx) -> Result<(), PersistError> {
369        info!("RelationshipManager: Establishing relations on startup");
370        trace!(
371            "RelationshipManager: BelongsTo relations by local: {:?}",
372            self.belongs_to_by_local.keys().collect::<Vec<_>>()
373        );
374        debug!(
375            "RelationshipManager: OwnsMany relations by local: {:?}",
376            self.owns_many_by_local.keys().collect::<Vec<_>>()
377        );
378
379        // 1. Orphan cleanup for BelongsTo relationships
380        self.cleanup_belongs_to_orphans(ctx)?;
381
382        // 2. Orphan cleanup for OwnsMany relationships
383        self.cleanup_owns_many_orphans(ctx)?;
384
385        // 3. EnsureFor initialization
386        self.initialize_ensure_for(ctx)?;
387
388        info!("RelationshipManager: Relations established");
389        Ok(())
390    }
391
392    // ─────────────────────────────────────────────────────────────────────────────
393    // Cascade handlers
394    // ─────────────────────────────────────────────────────────────────────────────
395
396    /// Handle BelongsTo cascades: when a parent is deleted, delete all children
397    fn handle_belongs_to_cascade(
398        &self,
399        item: &Arc<dyn AnyItem>,
400        ctx: &CellServerCtx,
401    ) -> Result<(), PersistError> {
402        let item_type = item.entity_type();
403        let Some(lookups) = self.belongs_to_by_foreign.get(item_type) else {
404            return Ok(());
405        };
406
407        let parent_id = item.id();
408
409        for lookup in lookups {
410            // Find children whose FK matches the deleted parent ID using extractor
411            let children = self.find_children_by_fk(ctx, lookup, &parent_id);
412            if children.is_empty() {
413                continue;
414            }
415
416            trace!(
417                "RelationshipManager: Cascade delete batch {} count={} (parent {} deleted)",
418                lookup.local_type,
419                children.len(),
420                &parent_id
421            );
422            self.publish_del_cascade_batch(ctx, &children)?;
423        }
424
425        Ok(())
426    }
427
428    fn handle_belongs_to_cascade_batch(
429        &self,
430        items: &[Arc<dyn AnyItem>],
431        ctx: &CellServerCtx,
432    ) -> Result<(), PersistError> {
433        let Some(first) = items.first() else {
434            return Ok(());
435        };
436        let item_type = first.entity_type();
437        let Some(lookups) = self.belongs_to_by_foreign.get(item_type) else {
438            return Ok(());
439        };
440
441        let parent_ids: Vec<Arc<str>> = items.iter().map(|item| item.id()).collect();
442
443        for lookup in lookups {
444            let mut children_by_id: HashMap<Arc<str>, Arc<dyn AnyItem>> = HashMap::new();
445            for parent_id in &parent_ids {
446                for child in self.find_children_by_fk(ctx, lookup, parent_id) {
447                    children_by_id.entry(child.id()).or_insert(child);
448                }
449            }
450
451            if children_by_id.is_empty() {
452                continue;
453            }
454
455            let children: Vec<_> = children_by_id.into_values().collect();
456            trace!(
457                "RelationshipManager: Cascade delete batch {} count={} ({} parents deleted)",
458                lookup.local_type,
459                children.len(),
460                parent_ids.len()
461            );
462            self.publish_del_cascade_batch(ctx, &children)?;
463        }
464
465        Ok(())
466    }
467
468    /// Find children whose FK matches a given parent ID
469    fn find_children_by_fk(
470        &self,
471        ctx: &CellServerCtx,
472        lookup: &BelongsToLookup,
473        parent_id: &str,
474    ) -> Vec<Arc<dyn AnyItem>> {
475        self.ensure_belongs_to_index_loaded(ctx, lookup);
476        if let Some(parent_map) = self.belongs_to_children_by_parent.get(&lookup.id) {
477            let store = ctx.registry.get_or_create(lookup.local_type);
478            let Some(child_ids) = parent_map.get(parent_id) else {
479                return Vec::new();
480            };
481            return child_ids
482                .iter()
483                .filter_map(|child_id| store.get_value(child_id))
484                .collect();
485        }
486
487        let store = ctx.registry.get_or_create(lookup.local_type);
488        store
489            .entries()
490            .get()
491            .into_iter()
492            .filter(|(_, item)| {
493                (lookup.extract_fk)(item.as_any())
494                    .map(|fk| fk.as_ref() == parent_id)
495                    .unwrap_or(false)
496            })
497            .map(|(_, item)| item)
498            .collect()
499    }
500
501    fn index_belongs_to_child(&self, lookup: &BelongsToLookup, item: &Arc<dyn AnyItem>) {
502        let child_id = item.id();
503        self.remove_belongs_to_child(lookup, &child_id);
504
505        let Some(parent_id) = (lookup.extract_fk)(item.as_any()) else {
506            return;
507        };
508
509        self.belongs_to_parent_by_child
510            .entry(lookup.id)
511            .or_default()
512            .insert(child_id.clone(), parent_id.clone());
513        self.belongs_to_children_by_parent
514            .entry(lookup.id)
515            .or_default()
516            .entry(parent_id)
517            .or_default()
518            .insert(child_id);
519    }
520
521    fn remove_belongs_to_child(&self, lookup: &BelongsToLookup, child_id: &Arc<str>) {
522        let Some(parent_map) = self.belongs_to_parent_by_child.get(&lookup.id) else {
523            return;
524        };
525        let Some((_, parent_id)) = parent_map.remove(child_id) else {
526            return;
527        };
528
529        let Some(children_by_parent) = self.belongs_to_children_by_parent.get(&lookup.id) else {
530            return;
531        };
532        let should_remove_parent = children_by_parent
533            .get_mut(parent_id.as_ref())
534            .map(|mut child_ids| {
535                child_ids.remove(child_id);
536                child_ids.is_empty()
537            })
538            .unwrap_or(false);
539
540        if should_remove_parent {
541            children_by_parent.remove(parent_id.as_ref());
542        }
543    }
544
545    fn ensure_belongs_to_index_loaded(&self, ctx: &CellServerCtx, lookup: &BelongsToLookup) {
546        if self.belongs_to_parent_by_child.contains_key(&lookup.id) {
547            return;
548        }
549
550        let child_index = DashMap::<Arc<str>, Arc<str>>::new();
551        let parent_index = DashMap::<Arc<str>, BTreeSet<Arc<str>>>::new();
552        let store = ctx.registry.get_or_create(lookup.local_type);
553
554        for (_, item) in store.snapshot() {
555            let Some(parent_id) = (lookup.extract_fk)(item.as_any()) else {
556                continue;
557            };
558            let child_id = item.id();
559            child_index.insert(child_id.clone(), parent_id.clone());
560            parent_index.entry(parent_id).or_default().insert(child_id);
561        }
562
563        let _ = self
564            .belongs_to_parent_by_child
565            .insert(lookup.id, child_index);
566        let _ = self
567            .belongs_to_children_by_parent
568            .insert(lookup.id, parent_index);
569    }
570
571    /// Handle OwnsMany parent delete: delete all owned children
572    fn handle_owns_many_parent_delete(
573        &self,
574        item: &Arc<dyn AnyItem>,
575        ctx: &CellServerCtx,
576    ) -> Result<(), PersistError> {
577        let item_type = item.entity_type();
578        let Some(lookups) = self.owns_many_by_local.get(item_type) else {
579            return Ok(());
580        };
581
582        for lookup in lookups {
583            // Extract child IDs using the typed extractor
584            let child_ids = match (lookup.extract_ids)(item.as_any()) {
585                Some(ids) => ids,
586                None => continue,
587            };
588
589            if child_ids.is_empty() {
590                continue;
591            }
592
593            let mut children = Vec::new();
594            for child_id in &child_ids {
595                if self.get_by_id(ctx, lookup.foreign_type, child_id).is_some()
596                    && let Some(child) = self.get_by_id(ctx, lookup.foreign_type, child_id)
597                {
598                    children.push(child);
599                }
600            }
601
602            if children.is_empty() {
603                continue;
604            }
605
606            trace!(
607                "RelationshipManager: Cascade delete owned batch {} count={}",
608                lookup.foreign_type,
609                children.len()
610            );
611            self.publish_del_cascade_batch(ctx, &children)?;
612        }
613
614        Ok(())
615    }
616
617    fn handle_owns_many_parent_delete_batch(
618        &self,
619        items: &[Arc<dyn AnyItem>],
620        ctx: &CellServerCtx,
621    ) -> Result<(), PersistError> {
622        let Some(first) = items.first() else {
623            return Ok(());
624        };
625        let item_type = first.entity_type();
626        let Some(lookups) = self.owns_many_by_local.get(item_type) else {
627            return Ok(());
628        };
629
630        for lookup in lookups {
631            let mut child_ids = BTreeSet::new();
632            for item in items {
633                if let Some(ids) = (lookup.extract_ids)(item.as_any()) {
634                    child_ids.extend(ids);
635                }
636            }
637
638            if child_ids.is_empty() {
639                continue;
640            }
641
642            let mut children = Vec::new();
643            for child_id in &child_ids {
644                if let Some(child) = self.get_by_id(ctx, lookup.foreign_type, child_id) {
645                    children.push(child);
646                }
647            }
648
649            if children.is_empty() {
650                continue;
651            }
652
653            trace!(
654                "RelationshipManager: Cascade delete owned batch {} count={} ({} parents deleted)",
655                lookup.foreign_type,
656                children.len(),
657                items.len()
658            );
659            self.publish_del_cascade_batch(ctx, &children)?;
660        }
661
662        Ok(())
663    }
664
665    /// Handle OwnsMany child delete: remove child ID from parent arrays
666    fn handle_owns_many_child_delete(
667        &self,
668        item: &Arc<dyn AnyItem>,
669        ctx: &CellServerCtx,
670    ) -> Result<(), PersistError> {
671        let item_type = item.entity_type();
672        let Some(lookups) = self.owns_many_by_foreign.get(item_type) else {
673            return Ok(());
674        };
675
676        let child_id = item.id();
677
678        for lookup in lookups {
679            // Find parents that contain this child ID using extract_ids
680            let parents = self.find_parents_containing(ctx, lookup, &child_id);
681            let mut updates = Vec::new();
682
683            for parent_item in parents {
684                // Use the remove_id extractor to get updated parent as Value
685                if let Some(updated_parent) = (lookup.remove_id)(parent_item.as_any(), &child_id) {
686                    trace!(
687                        "RelationshipManager: Updating {} {} to remove child {}",
688                        lookup.local_type,
689                        parent_item.id(),
690                        child_id
691                    );
692                    updates.push(updated_parent);
693                }
694            }
695
696            if !updates.is_empty() {
697                self.publish_set_cascade_batch(ctx, &updates)?;
698            }
699        }
700
701        Ok(())
702    }
703
704    /// Find parents whose owned array contains a given child ID
705    fn find_parents_containing(
706        &self,
707        ctx: &CellServerCtx,
708        lookup: &OwnsManyLookup,
709        child_id: &str,
710    ) -> Vec<Arc<dyn AnyItem>> {
711        let store = ctx.registry.get_or_create(lookup.local_type);
712        store
713            .entries()
714            .get()
715            .into_iter()
716            .filter(|(_, item)| {
717                (lookup.extract_ids)(item.as_any())
718                    .map(|ids| ids.iter().any(|id| id.as_ref() == child_id))
719                    .unwrap_or(false)
720            })
721            .map(|(_, item)| item)
722            .collect()
723    }
724
725    /// Handle EnsureFor: when dependency created, ensure derived entities exist
726    fn handle_ensure_for(
727        &self,
728        item: &Arc<dyn AnyItem>,
729        ctx: &CellServerCtx,
730    ) -> Result<(), PersistError> {
731        let item_type = item.entity_type();
732        let Some(lookups) = self.ensure_for_by_dependency.get(item_type) else {
733            return Ok(());
734        };
735
736        for lookup in lookups {
737            // Get all combinations of dependency entities
738            let combinations = self.get_dependency_combinations(ctx, &lookup.dependencies);
739
740            // Snapshot the store once outside the combo loop to avoid
741            // re-materializing entries() for every combination
742            let store = ctx.registry.get_or_create(lookup.local_type);
743            let existing_items = store.snapshot();
744
745            for combo in combinations {
746                // Check if derived entity already exists
747                let existing =
748                    Self::find_ensure_for_entity_in(&existing_items, &lookup.dependencies, &combo);
749
750                if existing.is_none() {
751                    // Create the derived entity using the factory
752                    let entity = (lookup.make_entity)(&combo);
753
754                    trace!(
755                        "RelationshipManager: Creating ensured {} for {:?}",
756                        lookup.local_type, combo
757                    );
758
759                    self.publish_set_cascade(ctx, lookup.local_type, entity)?;
760                }
761            }
762        }
763
764        Ok(())
765    }
766
767    // ─────────────────────────────────────────────────────────────────────────────
768    // Orphan cleanup
769    // ─────────────────────────────────────────────────────────────────────────────
770
771    /// Cleanup orphaned children for BelongsTo relationships
772    fn cleanup_belongs_to_orphans(&self, ctx: &CellServerCtx) -> Result<(), PersistError> {
773        trace!(
774            "RelationshipManager: cleanup_belongs_to_orphans - checking {} child types",
775            self.belongs_to_by_local.len()
776        );
777
778        for (child_type, lookups) in &self.belongs_to_by_local {
779            trace!(
780                "RelationshipManager: Checking BelongsTo orphans for child type '{}' ({} lookups)",
781                child_type,
782                lookups.len()
783            );
784
785            for lookup in lookups {
786                // Get all parent IDs that exist
787                let parents = self.get_all_items(ctx, lookup.foreign_type);
788                let parent_ids: HashSet<Arc<str>> = parents.iter().map(|p| p.id()).collect();
789
790                trace!(
791                    "RelationshipManager: {} -> {}: Found {} parents in store",
792                    child_type,
793                    lookup.foreign_type,
794                    parents.len()
795                );
796
797                // Get all children and find orphans using typed extractor
798                let children = self.get_all_items(ctx, child_type);
799                trace!(
800                    "RelationshipManager: {} -> {}: Found {} children in store",
801                    child_type,
802                    lookup.foreign_type,
803                    children.len()
804                );
805
806                let mut orphan_count = 0;
807                let mut valid_count = 0;
808                let mut no_fk_count = 0;
809
810                for child in &children {
811                    // Use the typed extractor to get the FK value
812                    if let Some(fk_value) = (lookup.extract_fk)(child.as_any()) {
813                        if !parent_ids.contains(&fk_value) {
814                            debug!(
815                                "RelationshipManager: ORPHAN {} {} has FK '{}' but parent {} not found (have {} parent IDs)",
816                                child_type,
817                                child.id(),
818                                fk_value,
819                                lookup.foreign_type,
820                                parent_ids.len()
821                            );
822                            self.publish_del_cascade(ctx, child_type, &child.id())?;
823                            orphan_count += 1;
824                        } else {
825                            valid_count += 1;
826                        }
827                    } else {
828                        trace!(
829                            "RelationshipManager: {} {} - extract_fk returned None",
830                            child_type,
831                            child.id()
832                        );
833                        no_fk_count += 1;
834                    }
835                }
836
837                trace!(
838                    "RelationshipManager: {} -> {}: {} orphans deleted, {} valid, {} no FK",
839                    child_type, lookup.foreign_type, orphan_count, valid_count, no_fk_count
840                );
841            }
842        }
843
844        Ok(())
845    }
846
847    /// Cleanup orphaned children for OwnsMany relationships
848    fn cleanup_owns_many_orphans(&self, ctx: &CellServerCtx) -> Result<(), PersistError> {
849        trace!(
850            "RelationshipManager: cleanup_owns_many_orphans - checking {} parent types",
851            self.owns_many_by_local.len()
852        );
853
854        for (parent_type, lookups) in &self.owns_many_by_local {
855            trace!(
856                "RelationshipManager: Checking OwnsMany orphans for parent type '{}' ({} lookups)",
857                parent_type,
858                lookups.len()
859            );
860
861            for lookup in lookups {
862                // Get all child IDs referenced by parents using typed extractors
863                let parents = self.get_all_items(ctx, parent_type);
864                let mut referenced_ids: HashSet<Arc<str>> = HashSet::new();
865
866                trace!(
867                    "RelationshipManager: {} ->> {}: Found {} parents in store",
868                    parent_type,
869                    lookup.foreign_type,
870                    parents.len()
871                );
872
873                let mut parents_with_ids = 0;
874                let mut parents_no_ids = 0;
875                for parent in &parents {
876                    if let Some(ids) = (lookup.extract_ids)(parent.as_any()) {
877                        if !ids.is_empty() {
878                            parents_with_ids += 1;
879                        }
880                        referenced_ids.extend(ids);
881                    } else {
882                        parents_no_ids += 1;
883                    }
884                }
885
886                trace!(
887                    "RelationshipManager: {} ->> {}: {} parents have child IDs, {} have no IDs, {} total referenced child IDs",
888                    parent_type,
889                    lookup.foreign_type,
890                    parents_with_ids,
891                    parents_no_ids,
892                    referenced_ids.len()
893                );
894
895                // Get all children and find orphans
896                let children = self.get_all_items(ctx, lookup.foreign_type);
897                trace!(
898                    "RelationshipManager: {} ->> {}: Found {} children in store",
899                    parent_type,
900                    lookup.foreign_type,
901                    children.len()
902                );
903
904                let mut orphan_count = 0;
905                let mut valid_count = 0;
906
907                for child in children {
908                    let child_id = child.id();
909                    if !referenced_ids.contains(&child_id) {
910                        debug!(
911                            "RelationshipManager: ORPHAN {} {} not referenced by any {} (have {} referenced IDs)",
912                            lookup.foreign_type,
913                            child_id,
914                            parent_type,
915                            referenced_ids.len()
916                        );
917                        self.publish_del_cascade(ctx, lookup.foreign_type, &child_id)?;
918                        orphan_count += 1;
919                    } else {
920                        valid_count += 1;
921                    }
922                }
923
924                if orphan_count > 0 {
925                    info!(
926                        "RelationshipManager: {} ->> {}: {} orphans deleted, {} valid",
927                        parent_type, lookup.foreign_type, orphan_count, valid_count
928                    );
929                } else {
930                    trace!(
931                        "RelationshipManager: {} ->> {}: {} orphans deleted, {} valid",
932                        parent_type, lookup.foreign_type, orphan_count, valid_count
933                    );
934                }
935            }
936        }
937
938        Ok(())
939    }
940
941    /// Initialize EnsureFor relationships (create missing derived entities)
942    fn initialize_ensure_for(&self, ctx: &CellServerCtx) -> Result<(), PersistError> {
943        // Track which local_types we've processed to avoid duplicates
944        let mut processed: HashSet<&'static str> = HashSet::new();
945
946        for lookups in self.ensure_for_by_dependency.values() {
947            for lookup in lookups {
948                if processed.contains(lookup.local_type) {
949                    continue;
950                }
951                processed.insert(lookup.local_type);
952
953                // Get all combinations of dependency entities
954                let combinations = self.get_dependency_combinations(ctx, &lookup.dependencies);
955
956                // Snapshot once outside the combo loop
957                let store = ctx.registry.get_or_create(lookup.local_type);
958                let existing_items = store.snapshot();
959
960                let mut created_count = 0;
961
962                for combo in combinations {
963                    // Check if derived entity already exists
964                    let existing = Self::find_ensure_for_entity_in(
965                        &existing_items,
966                        &lookup.dependencies,
967                        &combo,
968                    );
969
970                    if existing.is_none() {
971                        // Create the derived entity using the factory
972                        let entity = (lookup.make_entity)(&combo);
973                        self.publish_set_cascade(ctx, lookup.local_type, entity)?;
974                        created_count += 1;
975                    }
976                }
977
978                if created_count > 0 {
979                    info!(
980                        "RelationshipManager: Created {} {} entities via EnsureFor",
981                        created_count, lookup.local_type
982                    );
983                }
984            }
985        }
986
987        Ok(())
988    }
989
990    // ─────────────────────────────────────────────────────────────────────────────
991    // Query helpers (using CellServerCtx)
992    // ─────────────────────────────────────────────────────────────────────────────
993
994    /// Get an entity by ID
995    fn get_by_id(
996        &self,
997        ctx: &CellServerCtx,
998        entity_type: &str,
999        id: &str,
1000    ) -> Option<Arc<dyn AnyItem>> {
1001        let store = ctx.registry.get_or_create(entity_type);
1002        store.get_value(&id.into())
1003    }
1004
1005    /// Get all entities of a type
1006    fn get_all_items(&self, ctx: &CellServerCtx, entity_type: &str) -> Vec<Arc<dyn AnyItem>> {
1007        let store = ctx.registry.get_or_create(entity_type);
1008        store.snapshot().into_iter().map(|(_, item)| item).collect()
1009    }
1010
1011    /// Get all combinations of dependency entity IDs for EnsureFor
1012    fn get_dependency_combinations(
1013        &self,
1014        ctx: &CellServerCtx,
1015        dependencies: &[EnsureForDependency],
1016    ) -> Vec<Vec<Arc<str>>> {
1017        if dependencies.is_empty() {
1018            return vec![];
1019        }
1020
1021        // Get IDs for each dependency type
1022        let mut dep_ids: Vec<Vec<Arc<str>>> = Vec::new();
1023
1024        for dep in dependencies {
1025            let items = self.get_all_items(ctx, dep.foreign_type);
1026            let ids: Vec<Arc<str>> = items.iter().map(|item| item.id()).collect();
1027            dep_ids.push(ids);
1028        }
1029
1030        // Compute Cartesian product
1031        self.cartesian_product(&dep_ids)
1032    }
1033
1034    /// Compute Cartesian product of multiple ID sets
1035    fn cartesian_product(&self, sets: &[Vec<Arc<str>>]) -> Vec<Vec<Arc<str>>> {
1036        if sets.is_empty() {
1037            return vec![];
1038        }
1039
1040        let mut result = vec![vec![]];
1041
1042        for set in sets {
1043            let mut new_result = Vec::new();
1044            for existing in &result {
1045                for item in set {
1046                    let mut new_combo = existing.clone();
1047                    new_combo.push(item.clone());
1048                    new_result.push(new_combo);
1049                }
1050            }
1051            result = new_result;
1052        }
1053
1054        result
1055    }
1056
1057    /// Find an EnsureFor entity matching the given dependency IDs
1058    /// from a pre-computed snapshot of existing items.
1059    fn find_ensure_for_entity_in(
1060        items: &[(Arc<str>, Arc<dyn AnyItem>)],
1061        dependencies: &[EnsureForDependency],
1062        combo: &[Arc<str>],
1063    ) -> Option<Arc<dyn AnyItem>> {
1064        if dependencies.is_empty() || combo.is_empty() {
1065            return None;
1066        }
1067
1068        items.iter().find_map(|(_, item)| {
1069            // Check if all dependency FKs match the combo values
1070            let all_match = dependencies
1071                .iter()
1072                .zip(combo.iter())
1073                .all(|(dep, expected_id)| {
1074                    (dep.extract_fk)(item.as_any())
1075                        .map(|fk| fk == *expected_id)
1076                        .unwrap_or(false)
1077                });
1078
1079            if all_match { Some(item.clone()) } else { None }
1080        })
1081    }
1082
1083    // ─────────────────────────────────────────────────────────────────────────────
1084    // Publishing helpers (using CellServerCtx)
1085    // ─────────────────────────────────────────────────────────────────────────────
1086
1087    /// Publish a SET for cascade operations.
1088    ///
1089    /// Sets prevent_relationship_updates to avoid infinite loops.
1090    fn publish_set_cascade(
1091        &self,
1092        ctx: &CellServerCtx,
1093        _entity_type: &str,
1094        item: Arc<dyn AnyItem>,
1095    ) -> Result<(), PersistError> {
1096        let options = EventOptions {
1097            prevent_relationship_updates: true,
1098            ..Default::default()
1099        };
1100        ctx.set_dyn_with_options(item, Some(options))
1101    }
1102
1103    fn publish_set_cascade_batch(
1104        &self,
1105        ctx: &CellServerCtx,
1106        items: &[Arc<dyn AnyItem>],
1107    ) -> Result<(), PersistError> {
1108        let options = EventOptions {
1109            prevent_relationship_updates: true,
1110            ..Default::default()
1111        };
1112        ctx.batch_set_dyn_with_options(items, Some(options))
1113    }
1114
1115    /// Publish a DEL for cascade operations.
1116    ///
1117    /// Sets prevent_relationship_updates to avoid infinite loops.
1118    fn publish_del_cascade(
1119        &self,
1120        ctx: &CellServerCtx,
1121        entity_type: &str,
1122        id: &str,
1123    ) -> Result<(), PersistError> {
1124        let options = EventOptions {
1125            prevent_relationship_updates: true,
1126            ..Default::default()
1127        };
1128
1129        // Get the entity from the store
1130        let id_arc: Arc<str> = id.into();
1131        if let Some(item) = ctx.registry.get_or_create(entity_type).get_value(&id_arc) {
1132            debug!(
1133                "RelationshipManager: publish_del_cascade {} {} - entity found, deleting",
1134                entity_type, id
1135            );
1136            ctx.del_dyn_with_options(item, Some(options))?;
1137        } else {
1138            trace!(
1139                "RelationshipManager: publish_del_cascade {} {} - entity NOT found in store",
1140                entity_type, id
1141            );
1142        }
1143
1144        Ok(())
1145    }
1146
1147    fn publish_del_cascade_batch(
1148        &self,
1149        ctx: &CellServerCtx,
1150        items: &[Arc<dyn AnyItem>],
1151    ) -> Result<(), PersistError> {
1152        if items.is_empty() {
1153            return Ok(());
1154        }
1155
1156        let options = EventOptions {
1157            prevent_relationship_updates: true,
1158            ..Default::default()
1159        };
1160        ctx.batch_del_dyn_with_options(items, Some(options))
1161    }
1162}
1163
1164impl Default for RelationshipManager {
1165    fn default() -> Self {
1166        Self::new()
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::*;
1173
1174    #[test]
1175    fn test_relationship_manager_creation() {
1176        let manager = RelationshipManager::new();
1177
1178        // Should have built lookup tables from inventory
1179        // (actual counts depend on entities linked in test binary)
1180        // Just verify the manager initializes without panic
1181        let _ = manager.belongs_to_by_foreign.len();
1182        let _ = manager.owns_many_by_local.len();
1183    }
1184
1185    #[test]
1186    fn test_cartesian_product() {
1187        let manager = RelationshipManager::new();
1188
1189        let sets = vec![
1190            vec![Arc::from("a"), Arc::from("b")],
1191            vec![Arc::from("1"), Arc::from("2")],
1192        ];
1193
1194        let product = manager.cartesian_product(&sets);
1195
1196        assert_eq!(product.len(), 4);
1197        assert!(product.contains(&vec![Arc::from("a"), Arc::from("1")]));
1198        assert!(product.contains(&vec![Arc::from("a"), Arc::from("2")]));
1199        assert!(product.contains(&vec![Arc::from("b"), Arc::from("1")]));
1200        assert!(product.contains(&vec![Arc::from("b"), Arc::from("2")]));
1201    }
1202
1203    #[test]
1204    fn test_cartesian_product_empty() {
1205        let manager = RelationshipManager::new();
1206
1207        let sets: Vec<Vec<Arc<str>>> = vec![];
1208        let product = manager.cartesian_product(&sets);
1209        assert!(product.is_empty());
1210    }
1211}