gun/chain.rs
1use crate::core::GunCore;
2use crate::error::GunResult;
3use crate::state::Node;
4use crate::valid::valid;
5use serde_json::Value;
6use std::collections::HashSet;
7use std::sync::Arc;
8
9/// Chain - the main API for interacting with Gun
10/// Based on Gun.js chain.js and IGunChain interface
11/// This provides the fluent API: gun.get('key').put(data).on(callback)
12pub struct Chain {
13 pub core: Arc<GunCore>,
14 pub soul: Option<String>,
15 pub key: Option<String>,
16 pub parent: Option<Arc<Chain>>,
17 pub id: u64,
18 listener_ids: Arc<parking_lot::Mutex<HashSet<u64>>>, // Track listener IDs for off()
19}
20
21impl Chain {
22 pub fn new(core: Arc<GunCore>) -> Self {
23 let id = core.next_chain_id();
24 Self {
25 core,
26 soul: None,
27 key: None,
28 parent: None,
29 id,
30 listener_ids: Arc::new(parking_lot::Mutex::new(HashSet::new())),
31 }
32 }
33
34 pub fn with_soul(core: Arc<GunCore>, soul: String, parent: Option<Arc<Chain>>) -> Self {
35 let id = core.next_chain_id();
36 Self {
37 core,
38 soul: Some(soul),
39 key: None,
40 parent,
41 id,
42 listener_ids: Arc::new(parking_lot::Mutex::new(HashSet::new())),
43 }
44 }
45
46 pub fn with_key(core: Arc<GunCore>, key: String, parent: Arc<Chain>) -> Self {
47 let id = core.next_chain_id();
48 Self {
49 core,
50 soul: None, // Don't inherit parent's soul - each get() creates a new chain without a soul
51 // The soul will be generated in put_object() when self.soul is None
52 key: Some(key),
53 parent: Some(parent),
54 id,
55 listener_ids: Arc::new(parking_lot::Mutex::new(HashSet::new())),
56 }
57 }
58
59 /// Get a property or node by key
60 /// Based on Gun.js chain.get()
61 pub fn get(&self, key: &str) -> Arc<Chain> {
62 Arc::new(Chain::with_key(
63 self.core.clone(),
64 key.to_string(),
65 Arc::new(self.clone()),
66 ))
67 }
68
69 /// Put data into the current node/property
70 /// Based on Gun.js chain.put() - improved implementation
71 pub async fn put(&self, data: Value) -> GunResult<Arc<Chain>> {
72 // Handle function callback (deferred data)
73 // In Rust, this would be handled via async, so we'll skip this case for now
74
75 // Check for content addressing (hash verification for #hash souls)
76 if let Some(ref soul) = self.soul {
77 if soul.starts_with('#') {
78 // Content addressing: verify hash matches content
79 let hash_suffix = soul.strip_prefix('#').unwrap_or(soul); // Remove '#' prefix
80 let data_string = serde_json::to_string(&data)
81 .map_err(crate::error::GunError::Serialization)?;
82 let hash_result = crate::sea::work(
83 data_string.as_bytes(),
84 None,
85 crate::sea::WorkOptions {
86 name: Some("SHA-256".to_string()),
87 encode: Some("base64".to_string()),
88 ..Default::default()
89 },
90 )
91 .await
92 .map_err(|e| crate::error::GunError::Crypto(e.to_string()))?;
93
94 // Compare hash (support both base64 and hex)
95 use base64::Engine as _;
96 let hash_matches = hash_result == hash_suffix
97 || hex::encode(
98 base64::engine::general_purpose::STANDARD_NO_PAD
99 .decode(&hash_result)
100 .unwrap_or_default(),
101 ) == hash_suffix;
102
103 if !hash_matches {
104 return Err(crate::error::GunError::InvalidData(format!(
105 "Content hash mismatch: expected {}, got {}",
106 hash_suffix, hash_result
107 )));
108 }
109 }
110 }
111
112 // Validate data
113 match valid(&data) {
114 Ok(true) => {} // Valid simple value
115 Err(Some(soul)) => {
116 // It's a soul reference, create link
117 let soul_chain = self.core.graph.get(&soul);
118 if soul_chain.is_none() {
119 // Soul doesn't exist yet, we'll need to request it
120 // For now, create the node
121 let node = Node::with_soul(soul.clone());
122 self.core.graph.put(&soul, node)?;
123 }
124 return Ok(Arc::new(Chain::with_soul(
125 self.core.clone(),
126 soul,
127 Some(Arc::new(self.clone())),
128 )));
129 }
130 _ => {
131 // Invalid or object - handle object case
132 if let Value::Object(map) = data {
133 return self.put_object(map).await;
134 }
135 return Err(crate::error::GunError::InvalidData(
136 "Invalid data type".to_string(),
137 ));
138 }
139 }
140
141 // If we have a key but no soul, store the value in the parent node
142 if let Some(key) = &self.key {
143 if let Some(parent) = &self.parent {
144 // Try to resolve parent soul
145 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
146 let mut found_parent_soul: Option<String> = None;
147
148 while let Some(p) = current_parent {
149 if let Some(ps) = &p.soul {
150 found_parent_soul = Some(ps.clone());
151 break;
152 }
153 current_parent = p.parent.as_deref();
154 }
155
156 if let Some(parent_soul) = found_parent_soul {
157 // Store primitive value directly in parent node
158 let mut parent_node = self.core.graph.get(&parent_soul)
159 .unwrap_or_else(|| Node::with_soul(parent_soul.clone()));
160 let state = self.core.state.next();
161 parent_node.data.insert(key.clone(), data.clone());
162 crate::state::State::ify(&mut parent_node, Some(key), Some(state), Some(data.clone()), Some(&parent_soul));
163 self.core.graph.put(&parent_soul, parent_node.clone())?;
164 self.emit_update(&parent_soul, &parent_node.data);
165
166 // Store in persistent storage if available
167 if let Some(storage) = &self.core.storage {
168 storage.put(&parent_soul, &parent_node).await?;
169 }
170
171 return Ok(Arc::new(self.clone()));
172 }
173 }
174 }
175
176 let soul = match &self.soul {
177 Some(s) => s.clone(),
178 None => self.core.uuid(None),
179 };
180
181 // Create or update node
182 let mut node = self
183 .core
184 .graph
185 .get(&soul)
186 .unwrap_or_else(|| Node::with_soul(soul.clone()));
187
188 // Merge data into node
189 if let Some(key) = &self.key {
190 // Setting a property
191 let state = self.core.state.next();
192 node.data.insert(key.clone(), data.clone());
193 crate::state::State::ify(&mut node, Some(key), Some(state), Some(data), Some(&soul));
194 } else {
195 // Setting the whole node - but data is not an object here, so this shouldn't happen
196 // This case is handled above in put_object
197 }
198
199 // Store in graph
200 self.core.graph.put(&soul, node.clone())?;
201
202 // Emit update event
203 self.emit_update(&soul, &node.data);
204
205 // Store in persistent storage if available
206 if let Some(storage) = &self.core.storage {
207 storage.put(&soul, &node).await?;
208 }
209
210 Ok(Arc::new(Chain::with_soul(
211 self.core.clone(),
212 soul,
213 Some(Arc::new(self.clone())),
214 )))
215 }
216
217 /// Helper to put an object (node) with proper traversal
218 async fn put_object(&self, map: serde_json::Map<String, Value>) -> GunResult<Arc<Chain>> {
219 // Parse soul and check for expiration (<? suffix)
220 let (soul, expiration_seconds) = match &self.soul {
221 Some(s) => {
222 // Check for expiration suffix: soul<?3600 (expires after 3600 seconds)
223 if let Some(exp_pos) = s.find("<?") {
224 let soul_part = &s[..exp_pos];
225 let exp_part = &s[exp_pos + 2..];
226 if let Ok(exp_secs) = exp_part.parse::<f64>() {
227 (soul_part.to_string(), Some(exp_secs))
228 } else {
229 (s.clone(), None)
230 }
231 } else {
232 (s.clone(), None)
233 }
234 }
235 None => (self.core.uuid(None), None),
236 };
237
238 // Check expiration if present (for object puts, we check all keys)
239 if let Some(exp_secs) = expiration_seconds {
240 let now = chrono::Utc::now().timestamp_millis() as f64;
241 let expiration_time = now - (exp_secs * 1000.0);
242 if let Some(node) = self.core.graph.get(&soul) {
243 for key in map.keys() {
244 if let Some(state) = crate::state::State::is(&Some(node.clone()), key) {
245 if state < expiration_time {
246 // Data is expired, reject
247 return Err(crate::error::GunError::InvalidData(format!(
248 "Data expired for key {}: state {} is older than expiration {}",
249 key, state, expiration_time
250 )));
251 }
252 }
253 }
254 }
255 }
256
257 let mut node = self
258 .core
259 .graph
260 .get(&soul)
261 .unwrap_or_else(|| Node::with_soul(soul.clone()));
262
263 // Process each key-value pair
264 for (k, v) in map {
265 let state = self.core.state.next();
266
267 // Check if value is a soul reference
268 match valid(&v) {
269 Err(Some(ref_soul)) => {
270 // It's a reference to another node
271 let ref_node = self.core.graph.get(&ref_soul);
272 if ref_node.is_none() {
273 // Create placeholder node when soul reference doesn't exist yet
274 // This matches Gun.js behavior: creating a reference to a non-existent node
275 // creates a placeholder that can be filled in later when the actual node is received
276 let placeholder = Node::with_soul(ref_soul.clone());
277 self.core.graph.put(&ref_soul, placeholder)?;
278 }
279 // Store as soul reference
280 node.data
281 .insert(k.clone(), serde_json::json!({"#": ref_soul}));
282 }
283 _ => {
284 // Regular value - but if it's an object, we should store it as-is
285 // Nested objects are stored directly in the node, not as separate nodes
286 // This allows get("level1") to work by extracting from the parent node
287 node.data.insert(k.clone(), v.clone());
288 }
289 }
290
291 crate::state::State::ify(&mut node, Some(&k), Some(state), Some(v), Some(&soul));
292 }
293
294 self.core.graph.put(&soul, node.clone())?;
295 self.emit_update(&soul, &node.data);
296
297 if let Some(storage) = &self.core.storage {
298 storage.put(&soul, &node).await?;
299 }
300
301 // If we have a key, we need to store the soul reference in the parent node
302 // This allows once() to find the data later via path resolution
303 if let Some(key) = &self.key {
304 if let Some(parent) = &self.parent {
305 eprintln!("DEBUG: put_object() storing soul reference: key={}, soul={}, parent_soul={:?}", key, soul, parent.soul);
306 // Try to resolve or create parent node
307 if let Some(parent_soul) = &parent.soul {
308 // Parent has a soul, store reference there
309 // Create parent node if it doesn't exist
310 let mut parent_node = self.core.graph.get(parent_soul)
311 .unwrap_or_else(|| Node::with_soul(parent_soul.clone()));
312 let state = self.core.state.next();
313 let soul_ref = serde_json::json!({"#": soul});
314 eprintln!("DEBUG: Storing soul reference in parent: parent_soul={}, key={}, soul_ref={}", parent_soul, key, serde_json::to_string(&soul_ref).unwrap_or_default());
315 parent_node.data.insert(key.clone(), soul_ref.clone());
316 crate::state::State::ify(&mut parent_node, Some(key), Some(state), Some(soul_ref), Some(parent_soul));
317 self.core.graph.put(parent_soul, parent_node.clone())?;
318 self.emit_update(parent_soul, &parent_node.data);
319 } else {
320 // Parent has no soul - create one for it
321 // Use the parent's key if available, otherwise use a deterministic approach
322 let parent_soul = if let Some(parent_key) = &parent.key {
323 // Parent has a key - use it to create deterministic soul
324 use sha2::{Sha256, Digest};
325 use base64::{engine::general_purpose, Engine as _};
326 let mut hasher = Sha256::new();
327 hasher.update(parent_key.as_bytes());
328 let hash = general_purpose::STANDARD_NO_PAD.encode(hasher.finalize());
329 format!("root_{}", hash)
330 } else {
331 // Parent has no key - generate a new soul
332 self.core.uuid(None)
333 };
334
335 // Get or create parent node
336 let mut parent_node = self.core.graph.get(&parent_soul)
337 .unwrap_or_else(|| Node::with_soul(parent_soul.clone()));
338
339 // Store the soul reference in the parent node
340 let state = self.core.state.next();
341 let soul_ref = serde_json::json!({"#": soul});
342 parent_node.data.insert(key.clone(), soul_ref.clone());
343 crate::state::State::ify(&mut parent_node, Some(key), Some(state), Some(soul_ref), Some(&parent_soul));
344 self.core.graph.put(&parent_soul, parent_node.clone())?;
345 self.emit_update(&parent_soul, &parent_node.data);
346 }
347 }
348 }
349
350 Ok(Arc::new(Chain::with_soul(
351 self.core.clone(),
352 soul,
353 Some(Arc::new(self.clone())),
354 )))
355 }
356
357 /// Emit update event for listeners (synchronous)
358 fn emit_update(&self, soul: &str, data: &serde_json::Map<String, Value>) {
359 let event_type = format!("node_update:{}", soul);
360 let event = crate::events::Event {
361 event_type: event_type.clone(),
362 data: serde_json::Value::Object(data.clone()),
363 };
364 self.core.events.emit(&event);
365
366 // Also emit graph_update for listeners that don't have a specific soul yet
367 self.core.events.emit(&crate::events::Event {
368 event_type: "graph_update".to_string(),
369 data: serde_json::Value::Object(data.clone()),
370 });
371
372 // Also emit network_sync event for Gun to handle
373 let network_event = crate::events::Event {
374 event_type: "network_sync".to_string(),
375 data: serde_json::json!({
376 "soul": soul,
377 "data": serde_json::Value::Object(data.clone())
378 }),
379 };
380 self.core.events.emit(&network_event);
381 }
382
383 /// Subscribe to updates on this node/property
384 /// Based on Gun.js chain.on() - enhanced with change detection and network sync
385 ///
386 /// Subscribes to real-time updates for this node or property. The callback will be called:
387 /// - Immediately with current data if available
388 /// - Whenever the data changes (with change detection to avoid duplicate updates)
389 /// - When data is received from the network
390 ///
391 /// This method includes:
392 /// - Change detection: Only triggers callback when data actually changes
393 /// - Network synchronization: Triggers sync requests when local data changes
394 /// - Real-time propagation: Receives updates from network peers
395 ///
396 /// # Arguments
397 /// * `callback` - Closure that receives updates
398 /// - First parameter: The updated data value
399 /// - Second parameter: The key if this is a property access, `None` if node access
400 ///
401 /// # Returns
402 /// Returns `Arc<Chain>` for method chaining. Use `off()` to unsubscribe.
403 ///
404 /// # Example
405 /// ```rust,no_run
406 /// use gun::Gun;
407 /// use serde_json::json;
408 ///
409 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
410 /// let gun = Gun::new();
411 ///
412 /// // Subscribe to updates
413 /// let chain = gun.get("counter");
414 /// chain.on(|data, _key| {
415 /// if let Some(value) = data.as_i64() {
416 /// println!("Counter updated: {}", value);
417 /// }
418 /// });
419 ///
420 /// // Updates will trigger the callback
421 /// chain.put(json!(1)).await?;
422 /// chain.put(json!(2)).await?;
423 ///
424 /// // Unsubscribe when done
425 /// chain.off();
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub fn on<F>(&self, callback: F) -> Arc<Chain>
430 where
431 F: Fn(Value, Option<String>) + Send + Sync + Clone + 'static,
432 {
433 let chain = Arc::new(self.clone());
434 let soul = self.soul.clone();
435 let key = self.key.clone();
436 let listener_ids = self.listener_ids.clone();
437 let core = self.core.clone();
438
439 // Store previous value for change detection
440 let prev_value: Arc<parking_lot::Mutex<Option<Value>>> = Arc::new(parking_lot::Mutex::new(None));
441
442 // Try to resolve soul from path if we don't have one (similar to once())
443 // For on(), we always listen to parent node updates if we have a key but no soul
444 let resolved_soul = if let Some(ref s) = &soul {
445 s.clone()
446 } else if let Some(ref k) = &key {
447 // Try to resolve path by checking parent node
448 if let Some(parent) = &self.parent {
449 // Walk up the parent chain to find a node with a soul
450 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
451 let mut found_parent_soul: Option<String> = None;
452
453 while let Some(p) = current_parent {
454 if let Some(ps) = &p.soul {
455 found_parent_soul = Some(ps.clone());
456 break;
457 }
458 current_parent = p.parent.as_deref();
459 }
460
461 if let Some(parent_soul) = found_parent_soul {
462 // Listen to parent node updates - when parent updates, check if our key changed
463 parent_soul
464 } else {
465 // Parent has no soul - use generic event
466 // The parent will get a soul when data is put, and we'll receive updates then
467 "graph_update".to_string()
468 }
469 } else {
470 // No parent - use generic event
471 "graph_update".to_string()
472 }
473 } else {
474 // No key - use generic event
475 "graph_update".to_string()
476 };
477
478 // Determine event type
479 let event_type = if resolved_soul != "graph_update" {
480 format!("node_update:{}", resolved_soul)
481 } else {
482 "graph_update".to_string()
483 };
484
485 // Call callback with current data if available (before setting up listener)
486 if resolved_soul != "graph_update" {
487 if let Some(node) = self.core.graph.get(&resolved_soul) {
488 if let Some(ref k) = &key {
489 if let Some(value) = node.data.get(k) {
490 callback(value.clone(), Some(k.clone()));
491 *prev_value.lock() = Some(value.clone());
492 }
493 } else {
494 let node_data = serde_json::to_value(&node.data).unwrap_or(Value::Null);
495 callback(node_data.clone(), None);
496 *prev_value.lock() = Some(node_data);
497 }
498 } else if let Some(ref k) = &key {
499 // Check parent node for the key
500 if let Some(parent) = &self.parent {
501 // Try to resolve parent soul by walking up the chain
502 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
503 let mut found_parent_soul: Option<String> = None;
504
505 while let Some(p) = current_parent {
506 if let Some(ps) = &p.soul {
507 found_parent_soul = Some(ps.clone());
508 break;
509 }
510 current_parent = p.parent.as_deref();
511 }
512
513 if let Some(parent_soul) = found_parent_soul {
514 if let Some(parent_node) = self.core.graph.get(&parent_soul) {
515 if let Some(value) = parent_node.data.get(k) {
516 // Check if it's a soul reference or nested object
517 if let Some(obj) = value.as_object() {
518 if let Some(soul_ref) = obj.get("#") {
519 if let Some(soul_str) = soul_ref.as_str() {
520 if let Some(ref_node) = self.core.graph.get(soul_str) {
521 let node_data = serde_json::to_value(&ref_node.data).unwrap_or(Value::Null);
522 callback(node_data.clone(), Some(k.clone()));
523 *prev_value.lock() = Some(node_data);
524 } else {
525 callback(value.clone(), Some(k.clone()));
526 *prev_value.lock() = Some(value.clone());
527 }
528 } else {
529 callback(value.clone(), Some(k.clone()));
530 *prev_value.lock() = Some(value.clone());
531 }
532 } else {
533 // Nested object
534 callback(value.clone(), Some(k.clone()));
535 *prev_value.lock() = Some(value.clone());
536 }
537 } else {
538 callback(value.clone(), Some(k.clone()));
539 *prev_value.lock() = Some(value.clone());
540 }
541 }
542 }
543 }
544 }
545 }
546 }
547
548 // Now set up listener with cloned values (all values cloned before move)
549 let key_for_cb = key.clone();
550 let prev_value_for_cb = prev_value.clone();
551 let resolved_soul_for_cb = if resolved_soul != "graph_update" {
552 Some(resolved_soul.clone())
553 } else {
554 None
555 };
556
557 // Clone callback for use in closure
558 let callback_for_cb = callback.clone();
559
560 let core_for_resolve = core.clone();
561 // Build the key path for nested extraction
562 let mut key_path = Vec::new();
563 if let Some(ref k) = &key_for_cb {
564 key_path.push(k.clone());
565 // Walk up the parent chain to collect all keys in the path
566 // This handles cases like get("level1").get("level2")
567 // Note: The current implementation only stores one key per chain,
568 // so for deeply nested paths, we need to reconstruct the path
569 // For now, we'll handle single-level nesting in the callback
570 }
571
572 let cb = Box::new(move |event: &crate::events::Event| {
573 // Extract data and detect changes
574 let new_value = if let Some(ref k) = &key_for_cb {
575 // If we have a key, check if it's in the event data (parent node update)
576 if let Some(data_obj) = event.data.as_object() {
577 if let Some(value) = data_obj.get(k) {
578 // Check if it's a soul reference - if so, resolve it
579 if let Some(obj) = value.as_object() {
580 if let Some(soul_ref) = obj.get("#") {
581 if let Some(soul_str) = soul_ref.as_str() {
582 // It's a soul reference - get the actual node data
583 if let Some(node) = core_for_resolve.graph.get(soul_str) {
584 // For a property access, if node has only one key, return that value
585 // Otherwise return the whole node as object
586 if node.data.len() == 1 {
587 node.data.values().next().cloned().unwrap_or(Value::Null)
588 } else {
589 serde_json::to_value(&node.data).unwrap_or(Value::Null)
590 }
591 } else {
592 // Node not found yet - return the soul reference for now
593 value.clone()
594 }
595 } else {
596 value.clone()
597 }
598 } else {
599 // Not a soul reference - could be nested object
600 // For deeply nested paths like get("level1").get("level2"),
601 // we need to check if the parent chain has a child key
602 // For now, return the nested object and let the parent chain handle extraction
603 value.clone()
604 }
605 } else {
606 // Primitive value
607 value.clone()
608 }
609 } else {
610 Value::Null
611 }
612 } else {
613 Value::Null
614 }
615 } else {
616 event.data.clone()
617 };
618
619 // Check if value has changed (including null changes)
620 let mut prev = prev_value_for_cb.lock();
621 let has_changed = prev.as_ref()
622 .map(|pv| pv != &new_value)
623 .unwrap_or(true);
624
625 // For on(), call callback when value changes (even if null)
626 if has_changed {
627 callback_for_cb(new_value.clone(), key_for_cb.clone());
628 *prev = Some(new_value.clone());
629 }
630 });
631
632 let listener_id = self.core.events.on(&event_type, cb);
633 listener_ids.lock().insert(listener_id);
634
635 // Also try removing from graph_update as fallback when off() is called
636 // Store the event type we used (we'll try both in off())
637
638 chain
639 }
640
641 /// Get data once without subscribing
642 /// Based on Gun.js chain.once() - improved with async waiting and network requests
643 ///
644 /// Retrieves data once without creating a subscription. If data is not found locally,
645 /// sends a network request via DAM protocol and waits for response with timeout (default 5 seconds).
646 ///
647 /// # Arguments
648 /// * `callback` - Closure that receives the data and optional key
649 /// - First parameter: The data value (or `Value::Null` if not found)
650 /// - Second parameter: The key if this is a property access, `None` if node access
651 ///
652 /// # Returns
653 /// Returns `Ok(Arc<Chain>)` for method chaining. The callback is called with:
654 /// - The data if found locally or received from network
655 /// - `Value::Null` if data not found and network request times out or fails
656 ///
657 /// # Errors
658 /// Returns `GunError` if there's an error during the operation
659 ///
660 /// # Example
661 /// ```rust,no_run
662 /// use gun::Gun;
663 /// use serde_json::json;
664 ///
665 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
666 /// let gun = Gun::new();
667 ///
668 /// // Put data first
669 /// gun.get("user").put(json!({"name": "Alice"})).await?;
670 ///
671 /// // Read once
672 /// gun.get("user").once(|data, _key| {
673 /// if let Some(obj) = data.as_object() {
674 /// println!("Name: {:?}", obj.get("name"));
675 /// }
676 /// }).await?;
677 /// # Ok(())
678 /// # }
679 /// ```
680 pub async fn once<F>(&self, callback: F) -> GunResult<Arc<Chain>>
681 where
682 F: FnOnce(Value, Option<String>),
683 {
684 // Try to resolve soul from path if we don't have one
685 let mut resolved_soul_opt: Option<String> = None;
686 let soul = match &self.soul {
687 Some(s) => s.clone(),
688 None => {
689 // Try to resolve path by checking if we can find data through the parent chain
690 // This handles cases like: gun.get("test").get("read_test").put(obj).once(...)
691 if let Some(key) = &self.key {
692 if let Some(parent) = &self.parent {
693 // Try to find the soul by looking in parent's node
694 if let Some(parent_soul) = &parent.soul {
695 if let Some(parent_node) = self.core.graph.get(parent_soul) {
696 if let Some(value) = parent_node.data.get(key) {
697 // Check if it's a soul reference
698 if let Some(obj) = value.as_object() {
699 if let Some(soul_ref) = obj.get("#") {
700 if let Some(soul_str) = soul_ref.as_str() {
701 // Found a soul reference, use it
702 let resolved_soul = soul_str.to_string();
703 eprintln!("DEBUG: once() resolved path to soul: {}", resolved_soul);
704 // Continue with the resolved soul
705 if let Some(node) = self.core.graph.get(&resolved_soul) {
706 let node_data = serde_json::to_value(&node.data).unwrap_or(Value::Null);
707 eprintln!("DEBUG: once() found node locally, calling callback with data");
708 callback(node_data, self.key.clone());
709 return Ok(Arc::new(self.clone()));
710 } else {
711 eprintln!("DEBUG: once() resolved soul {} but node not found locally, will request from network", resolved_soul);
712 // Store resolved soul for network request
713 resolved_soul_opt = Some(resolved_soul);
714 }
715 } else {
716 // Object but no "#" key - it's a nested object, return it directly
717 eprintln!("DEBUG: once() found nested object in parent node");
718 callback(value.clone(), self.key.clone());
719 return Ok(Arc::new(self.clone()));
720 }
721 } else {
722 // Object but no "#" key - it's a nested object, return it directly
723 eprintln!("DEBUG: once() found nested object in parent node");
724 callback(value.clone(), self.key.clone());
725 return Ok(Arc::new(self.clone()));
726 }
727 } else {
728 // Not a soul reference - could be a nested object or primitive value
729 // Return the value directly
730 eprintln!("DEBUG: once() found value directly in parent node (not a soul reference): {:?}", if value.is_object() { "object" } else { "primitive" });
731 callback(value.clone(), self.key.clone());
732 return Ok(Arc::new(self.clone()));
733 }
734 } else {
735 eprintln!("DEBUG: once() key {} not found in parent node {}, will request from network", key, parent_soul);
736 // Key not found in parent - might be nested deeper, need to wait for network
737 }
738 } else {
739 eprintln!("DEBUG: once() parent node {} not found, will request parent node from network first", parent_soul);
740 // Request parent node first
741 let get_request = serde_json::json!({
742 "get": {
743 "#": parent_soul
744 }
745 });
746 self.core.events.emit(&crate::events::Event {
747 event_type: "get_request".to_string(),
748 data: get_request,
749 });
750 }
751 } else {
752 // Parent has no soul - try to resolve it by walking up the chain
753 // or by looking for the parent node in the graph
754 eprintln!("DEBUG: once() parent has no soul, trying to resolve parent node");
755
756 // Walk up the parent chain to find a node with a soul
757 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
758 let mut found_parent_soul: Option<String> = None;
759
760 while let Some(p) = current_parent {
761 if let Some(ps) = &p.soul {
762 found_parent_soul = Some(ps.clone());
763 break;
764 }
765 // Try grandparent
766 current_parent = p.parent.as_deref();
767 }
768
769 // If we found a parent soul, try to get the value from that node
770 if let Some(parent_soul) = found_parent_soul {
771 if let Some(parent_node) = self.core.graph.get(&parent_soul) {
772 // Check if parent has our key
773 if let Some(key) = &self.key {
774 // First check if parent has the key directly (nested object in parent)
775 if let Some(value) = parent_node.data.get(key) {
776 // Check if it's a soul reference or nested object
777 if let Some(obj) = value.as_object() {
778 if obj.get("#").is_some() {
779 // It's a soul reference - get that node and extract the key from it
780 if let Some(soul_str) = obj.get("#").and_then(|v| v.as_str()) {
781 if let Some(ref_node) = self.core.graph.get(soul_str) {
782 // The key might be in the referenced node
783 if let Some(nested_value) = ref_node.data.get(key) {
784 eprintln!("DEBUG: once() found key '{}' in referenced node {}", key, soul_str);
785 callback(nested_value.clone(), self.key.clone());
786 return Ok(Arc::new(self.clone()));
787 } else {
788 // Return the whole node data
789 let node_data = serde_json::to_value(&ref_node.data).unwrap_or(Value::Null);
790 eprintln!("DEBUG: once() found soul reference '{}' in resolved parent node {}, returning node data", key, parent_soul);
791 callback(node_data, self.key.clone());
792 return Ok(Arc::new(self.clone()));
793 }
794 }
795 }
796 } else {
797 // It's a nested object - return it directly
798 eprintln!("DEBUG: once() extracting nested object '{}' from resolved parent node {}", key, parent_soul);
799 callback(value.clone(), self.key.clone());
800 return Ok(Arc::new(self.clone()));
801 }
802 } else {
803 // Primitive value - return it
804 eprintln!("DEBUG: once() extracting primitive '{}' from resolved parent node {}", key, parent_soul);
805 callback(value.clone(), self.key.clone());
806 return Ok(Arc::new(self.clone()));
807 }
808 }
809
810 // Key not found directly in parent - check if parent has a soul reference to a node that might contain it
811 // Walk through all values in parent node to find soul references
812 for (parent_key, parent_value) in &parent_node.data {
813 if let Some(obj) = parent_value.as_object() {
814 if let Some(soul_ref) = obj.get("#") {
815 if let Some(soul_str) = soul_ref.as_str() {
816 // Found a soul reference - check if the referenced node has our key
817 if let Some(ref_node) = self.core.graph.get(soul_str) {
818 if let Some(nested_value) = ref_node.data.get(key) {
819 eprintln!("DEBUG: once() found key '{}' in node {} referenced by parent key '{}'", key, soul_str, parent_key);
820 callback(nested_value.clone(), self.key.clone());
821 return Ok(Arc::new(self.clone()));
822 }
823
824 // Also check if any value in the referenced node is a nested object containing our key
825 // This handles deeply nested paths like level1.level2.level3
826 for (ref_key, ref_value) in &ref_node.data {
827 if let Some(nested_obj) = ref_value.as_object() {
828 // Check if this nested object has our key (for deeply nested like level1.level2.level3)
829 if nested_obj.get("#").is_none() {
830 // Recursively search nested objects
831 fn find_in_nested(obj: &serde_json::Map<String, serde_json::Value>, search_key: &str) -> Option<serde_json::Value> {
832 if let Some(val) = obj.get(search_key) {
833 return Some(val.clone());
834 }
835 for (_, val) in obj {
836 if let Some(nested) = val.as_object() {
837 if nested.get("#").is_none() {
838 if let Some(found) = find_in_nested(nested, search_key) {
839 return Some(found);
840 }
841 }
842 }
843 }
844 None
845 }
846
847 if let Some(deep_value) = find_in_nested(nested_obj, key) {
848 eprintln!("DEBUG: once() found deeply nested key '{}' in nested object '{}' in node {}", key, ref_key, soul_str);
849 callback(deep_value.clone(), self.key.clone());
850 return Ok(Arc::new(self.clone()));
851 }
852 }
853 }
854 }
855 }
856 }
857 } else {
858 // Not a soul reference - could be a nested object
859 // Check if this nested object contains our key (for cases like level1.level2)
860 if let Some(nested_obj) = parent_value.as_object() {
861 if nested_obj.get(key).is_some() {
862 if let Some(deep_value) = nested_obj.get(key) {
863 eprintln!("DEBUG: once() found key '{}' in nested object '{}' in parent node {}", key, parent_key, parent_soul);
864 callback(deep_value.clone(), self.key.clone());
865 return Ok(Arc::new(self.clone()));
866 }
867 }
868 }
869 }
870 }
871 }
872
873 eprintln!("DEBUG: once() key '{}' not found in resolved parent node {} or its referenced nodes", key, parent_soul);
874 }
875 } else {
876 eprintln!("DEBUG: once() resolved parent soul {} but node not found in graph", parent_soul);
877 }
878 } else {
879 eprintln!("DEBUG: once() could not resolve parent soul by walking up chain");
880 }
881
882 eprintln!("DEBUG: once() parent has no soul, will try to wait for network data");
883 }
884 }
885 }
886 // Could not resolve path - wait for network data instead of returning immediately
887 // Use resolved soul if we found one, otherwise generate a new one
888 resolved_soul_opt.unwrap_or_else(|| self.core.uuid(None));
889 // The data might be syncing from another client, so we should wait
890 // We'll use a generic listener that waits for any update to the parent path
891 if let Some(key) = &self.key {
892 if let Some(parent) = &self.parent {
893 // Set up a listener for parent updates that might contain our key
894 let key_clone = key.clone();
895 let callback_clone = callback;
896 let parent_soul_opt = parent.soul.clone();
897
898 // Wait for parent node to be updated with our key
899 let data_received: Arc<parking_lot::Mutex<Option<serde_json::Value>>> = Arc::new(parking_lot::Mutex::new(None));
900 let data_received_clone = data_received.clone();
901 let data_ready: Arc<parking_lot::Mutex<bool>> = Arc::new(parking_lot::Mutex::new(false));
902 let data_ready_clone = data_ready.clone();
903
904 // Listen for parent node updates
905 let event_type = if let Some(ref ps) = parent_soul_opt {
906 format!("node_update:{}", ps)
907 } else {
908 "graph_update".to_string()
909 };
910
911 let listener_id = self.core.events.on(&event_type, Box::new(move |event: &crate::events::Event| {
912 if let Some(data_obj) = event.data.as_object() {
913 if let Some(value) = data_obj.get(&key_clone) {
914 // Found our key in the update
915 *data_received_clone.lock() = Some(value.clone());
916 *data_ready_clone.lock() = true;
917 }
918 }
919 }));
920
921 // Also check if data is already available
922 if let Some(ref ps) = parent_soul_opt {
923 if let Some(parent_node) = self.core.graph.get(ps) {
924 if let Some(value) = parent_node.data.get(key) {
925 // Check if it's a soul reference
926 if let Some(obj) = value.as_object() {
927 if let Some(soul_ref) = obj.get("#") {
928 if let Some(soul_str) = soul_ref.as_str() {
929 // Found a soul reference, get the node
930 if let Some(node) = self.core.graph.get(soul_str) {
931 let node_data = serde_json::to_value(&node.data).unwrap_or(Value::Null);
932 self.core.events.off(&event_type, listener_id);
933 callback_clone(node_data, Some(key.clone()));
934 return Ok(Arc::new(self.clone()));
935 }
936 }
937 }
938 }
939 // Not a soul reference - could be nested object or primitive
940 // Return the value directly
941 self.core.events.off(&event_type, listener_id);
942 callback_clone(value.clone(), Some(key.clone()));
943 return Ok(Arc::new(self.clone()));
944 }
945 }
946 }
947
948 // Wait for data with longer timeout (15 seconds for network sync)
949 let timeout_duration = tokio::time::Duration::from_secs(15);
950 let start = std::time::Instant::now();
951
952 loop {
953 if start.elapsed() > timeout_duration {
954 break; // Timeout
955 }
956
957 let ready = *data_ready.lock();
958 if ready {
959 break; // Data received
960 }
961
962 // Also check periodically if data arrived
963 if let Some(ref ps) = parent_soul_opt {
964 if let Some(parent_node) = self.core.graph.get(ps) {
965 if let Some(value) = parent_node.data.get(key) {
966 *data_received.lock() = Some(value.clone());
967 *data_ready.lock() = true;
968 break;
969 }
970 }
971 }
972
973 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
974 }
975
976 // Remove listener
977 self.core.events.off(&event_type, listener_id);
978
979 // Process result
980 let received_data = data_received.lock().take();
981 let value = received_data.unwrap_or(Value::Null);
982 callback_clone(value, Some(key.clone()));
983 return Ok(Arc::new(self.clone()));
984 }
985 }
986 // No parent and no soul - return null immediately
987 callback(Value::Null, self.key.clone());
988 return Ok(Arc::new(self.clone()));
989 }
990 };
991
992 // Try to get from graph immediately
993 // First check if we have a key and can extract from parent (for nested objects)
994 // This handles cases where parent chain has no soul but parent node exists
995 if let Some(key) = &self.key {
996 if let Some(parent) = &self.parent {
997 // Try to find parent node even if parent chain has no soul
998 // Walk up the parent chain to find a node with a soul
999 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
1000 while let Some(p) = current_parent {
1001 if let Some(parent_soul) = &p.soul {
1002 if let Some(parent_node) = self.core.graph.get(parent_soul) {
1003 if let Some(value) = parent_node.data.get(key) {
1004 // Check if it's a soul reference or nested object
1005 if let Some(obj) = value.as_object() {
1006 if obj.get("#").is_some() {
1007 // It's a soul reference - try to get that node
1008 if let Some(soul_str) = obj.get("#").and_then(|v| v.as_str()) {
1009 if let Some(ref_node) = self.core.graph.get(soul_str) {
1010 let node_data = serde_json::to_value(&ref_node.data).unwrap_or(Value::Null);
1011 eprintln!("DEBUG: once() found soul reference '{}' in parent node {}", key, parent_soul);
1012 callback(node_data, self.key.clone());
1013 return Ok(Arc::new(self.clone()));
1014 }
1015 }
1016 } else {
1017 // It's a nested object - return it directly
1018 eprintln!("DEBUG: once() extracting nested object '{}' from parent node {}", key, parent_soul);
1019 callback(value.clone(), self.key.clone());
1020 return Ok(Arc::new(self.clone()));
1021 }
1022 } else {
1023 // Primitive value - return it
1024 eprintln!("DEBUG: once() extracting primitive '{}' from parent node {}", key, parent_soul);
1025 callback(value.clone(), self.key.clone());
1026 return Ok(Arc::new(self.clone()));
1027 }
1028 }
1029 }
1030 break; // Found parent with soul, stop searching
1031 }
1032 // Try grandparent
1033 current_parent = p.parent.as_deref();
1034 }
1035 }
1036 }
1037
1038 // Try to get from graph by soul
1039 if let Some(node) = self.core.graph.get(&soul) {
1040 let value = if let Some(key) = &self.key {
1041 node.data.get(key).cloned().unwrap_or(Value::Null)
1042 } else {
1043 serde_json::to_value(&node.data).unwrap_or(Value::Null)
1044 };
1045 callback(value, self.key.clone());
1046 return Ok(Arc::new(self.clone()));
1047 }
1048
1049 // Data not found locally - request from network
1050 // Set up a one-time listener for this soul
1051 let event_type = format!("node_update:{}", soul);
1052
1053 // Use shared data location since oneshot::Sender can't be cloned
1054 let data_received: Arc<parking_lot::Mutex<Option<serde_json::Value>>> = Arc::new(parking_lot::Mutex::new(None));
1055 let data_received_clone = data_received.clone();
1056 let data_ready: Arc<parking_lot::Mutex<bool>> = Arc::new(parking_lot::Mutex::new(false));
1057 let data_ready_clone = data_ready.clone();
1058
1059 // Register a one-time listener (using Fn closure, not FnOnce)
1060 let listener_id = self.core.events.on(&event_type, Box::new(move |event: &crate::events::Event| {
1061 // Extract the data from the event
1062 let data = event.data.clone();
1063 *data_received_clone.lock() = Some(data);
1064 *data_ready_clone.lock() = true;
1065 }));
1066
1067 // Emit a get request event that the mesh can listen to
1068 // Include key if we have one (for nested properties)
1069 let mut get_obj = serde_json::json!({
1070 "#": soul
1071 });
1072 if let Some(key) = &self.key {
1073 get_obj["."] = serde_json::Value::String(key.clone());
1074 }
1075 let get_request = serde_json::json!({
1076 "get": get_obj
1077 });
1078 eprintln!("DEBUG: Emitting get_request for soul {} with key {:?}", soul, self.key);
1079 self.core.events.emit(&crate::events::Event {
1080 event_type: "get_request".to_string(),
1081 data: get_request,
1082 });
1083
1084 // Wait for response with timeout (20 seconds for network sync - increased for relay server delays)
1085 let timeout_duration = tokio::time::Duration::from_secs(20);
1086 let start = std::time::Instant::now();
1087
1088 // Poll for data with timeout
1089 loop {
1090 if start.elapsed() > timeout_duration {
1091 eprintln!("DEBUG: once() timeout waiting for data for soul {}", soul);
1092 break; // Timeout
1093 }
1094
1095 let ready = *data_ready.lock();
1096 if ready {
1097 eprintln!("DEBUG: once() data received for soul {}", soul);
1098 break; // Data received
1099 }
1100
1101 // Check periodically if data arrived in graph (in case event wasn't emitted)
1102 if let Some(node) = self.core.graph.get(&soul) {
1103 let value = if let Some(key) = &self.key {
1104 node.data.get(key).cloned().unwrap_or(Value::Null)
1105 } else {
1106 serde_json::to_value(&node.data).unwrap_or(Value::Null)
1107 };
1108 if !value.is_null() {
1109 eprintln!("DEBUG: once() found data in graph for soul {}, calling callback", soul);
1110 *data_received.lock() = Some(value.clone());
1111 *data_ready.lock() = true;
1112 break;
1113 }
1114 }
1115
1116 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1117 }
1118
1119 // Remove listener
1120 self.core.events.off(&event_type, listener_id);
1121
1122 // Process result
1123 let received_data = data_received.lock().take();
1124 let value = if let Some(data) = received_data {
1125 if let Some(key) = &self.key {
1126 data.get(key).cloned().unwrap_or(Value::Null)
1127 } else {
1128 data
1129 }
1130 } else {
1131 Value::Null
1132 };
1133 callback(value, self.key.clone());
1134
1135 Ok(Arc::new(self.clone()))
1136 }
1137
1138 /// Map over properties of a node
1139 /// Based on Gun.js chain.map() - complete implementation
1140 pub fn map<F>(&self, callback: F) -> Arc<Chain>
1141 where
1142 F: Fn(Value, String) + Send + Sync + Clone + 'static,
1143 {
1144 let chain = Arc::new(self.clone());
1145 let listener_ids = self.listener_ids.clone();
1146
1147 // Resolve soul the same way on() does
1148 let resolved_soul = if let Some(ref s) = &self.soul {
1149 s.clone()
1150 } else if let Some(ref k) = &self.key {
1151 // Try to resolve path by checking parent node
1152 if let Some(parent) = &self.parent {
1153 // Walk up the parent chain to find a node with a soul
1154 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
1155 let mut found_parent_soul: Option<String> = None;
1156
1157 while let Some(p) = current_parent {
1158 if let Some(ps) = &p.soul {
1159 found_parent_soul = Some(ps.clone());
1160 break;
1161 }
1162 current_parent = p.parent.as_deref();
1163 }
1164
1165 if let Some(parent_soul) = found_parent_soul {
1166 parent_soul
1167 } else {
1168 "graph_update".to_string()
1169 }
1170 } else {
1171 "graph_update".to_string()
1172 }
1173 } else {
1174 "graph_update".to_string()
1175 };
1176
1177 // Clone callback for use in closure
1178 let callback_clone = callback.clone();
1179 let key_for_map = self.key.clone();
1180
1181 // Subscribe to updates and call callback for each property
1182 if resolved_soul != "graph_update" {
1183 let event_type = format!("node_update:{}", resolved_soul);
1184 let core_for_map = self.core.clone();
1185 let cb = Box::new(move |event: &crate::events::Event| {
1186 if let Some(data_obj) = event.data.as_object() {
1187 // Check if we have a key - if so, the data might be in a referenced node
1188 if let Some(ref k) = &key_for_map {
1189 if let Some(value) = data_obj.get(k) {
1190 // Check if it's a soul reference
1191 if let Some(obj) = value.as_object() {
1192 if let Some(soul_ref) = obj.get("#") {
1193 if let Some(soul_str) = soul_ref.as_str() {
1194 // It's a soul reference - map over the referenced node
1195 if let Some(ref_node) = core_for_map.graph.get(soul_str) {
1196 for (key, value) in ref_node.data.iter() {
1197 if key != "_" && !key.starts_with('>') {
1198 callback_clone(value.clone(), key.clone());
1199 }
1200 }
1201 return;
1202 }
1203 }
1204 }
1205 }
1206 }
1207 }
1208 // No key or not a soul reference - map over the event data directly
1209 for (key, value) in data_obj {
1210 if key != "_" && !key.starts_with('>') {
1211 callback_clone(value.clone(), key.clone());
1212 }
1213 }
1214 }
1215 });
1216
1217 let listener_id = self.core.events.on(&event_type, cb);
1218 listener_ids.lock().insert(listener_id);
1219
1220 // Also call for current data if available
1221 if let Some(node) = self.core.graph.get(&resolved_soul) {
1222 // Check if we have a key - if so, the data might be in a referenced node
1223 if let Some(ref k) = &self.key {
1224 if let Some(value) = node.data.get(k) {
1225 // Check if it's a soul reference
1226 if let Some(obj) = value.as_object() {
1227 if let Some(soul_ref) = obj.get("#") {
1228 if let Some(soul_str) = soul_ref.as_str() {
1229 // It's a soul reference - map over the referenced node
1230 if let Some(ref_node) = self.core.graph.get(soul_str) {
1231 for (key, value) in ref_node.data.iter() {
1232 if key != "_" && !key.starts_with('>') {
1233 callback(value.clone(), key.clone());
1234 }
1235 }
1236 return chain;
1237 }
1238 }
1239 }
1240 }
1241 }
1242 }
1243 // No key or not a soul reference - map over the node data directly
1244 for (key, value) in node.data.iter() {
1245 if key != "_" && !key.starts_with('>') {
1246 callback(value.clone(), key.clone());
1247 }
1248 }
1249 }
1250 }
1251
1252 chain
1253 }
1254
1255 /// Add item to a set
1256 /// Based on Gun.js chain.set() - proper set implementation
1257 pub async fn set(&self, item: Value) -> GunResult<Arc<Chain>> {
1258 // Check if item has a soul (is a node reference)
1259 let soul = match valid(&item) {
1260 Err(Some(ref_soul)) => Some(ref_soul.clone()),
1261 _ => {
1262 // Item doesn't have a soul, generate one
1263 // set() expects objects (nodes) to be added to the set
1264 if item.is_object() {
1265 let new_soul = self.core.uuid(None);
1266 // Create node for the item
1267 if let Value::Object(ref map) = item {
1268 let mut node = Node::with_soul(new_soul.clone());
1269 for (k, v) in map {
1270 let state = self.core.state.next();
1271 node.data.insert(k.clone(), v.clone());
1272 crate::state::State::ify(
1273 &mut node,
1274 Some(k),
1275 Some(state),
1276 Some(v.clone()),
1277 Some(&new_soul),
1278 );
1279 }
1280 self.core.graph.put(&new_soul, node.clone())?;
1281 if let Some(storage) = &self.core.storage {
1282 storage.put(&new_soul, &node).await?;
1283 }
1284 }
1285 Some(new_soul)
1286 } else {
1287 // Non-object items can't be in sets
1288 return Err(crate::error::GunError::InvalidData(
1289 "set() only accepts objects/nodes".to_string(),
1290 ));
1291 }
1292 }
1293 };
1294
1295 if let Some(ref_soul) = soul {
1296 // Add reference to the set node
1297 let set_soul = self.soul.clone().unwrap_or_else(|| self.core.uuid(None));
1298 let mut set_node = self
1299 .core
1300 .graph
1301 .get(&set_soul)
1302 .unwrap_or_else(|| Node::with_soul(set_soul.clone()));
1303
1304 // Store reference to the item
1305 let key = ref_soul.clone();
1306 let state = self.core.state.next();
1307 set_node
1308 .data
1309 .insert(key.clone(), serde_json::json!({"#": ref_soul}));
1310 crate::state::State::ify(
1311 &mut set_node,
1312 Some(&key),
1313 Some(state),
1314 Some(serde_json::json!({"#": ref_soul})),
1315 Some(&set_soul),
1316 );
1317
1318 self.core.graph.put(&set_soul, set_node.clone())?;
1319 self.emit_update(&set_soul, &set_node.data);
1320
1321 if let Some(storage) = &self.core.storage {
1322 storage.put(&set_soul, &set_node).await?;
1323 }
1324
1325 Ok(Arc::new(Chain::with_soul(
1326 self.core.clone(),
1327 set_soul,
1328 Some(Arc::new(self.clone())),
1329 )))
1330 } else {
1331 self.put(item).await
1332 }
1333 }
1334
1335 /// Go back up the chain
1336 /// Based on Gun.js chain.back()
1337 pub fn back(&self, amount: Option<usize>) -> Option<Arc<Chain>> {
1338 match amount {
1339 Some(0) | None => self.parent.clone(),
1340 Some(1) => self.parent.clone(),
1341 Some(n) => {
1342 // Go back n levels
1343 let mut current = self.parent.clone();
1344 for _ in 1..n {
1345 current = current.and_then(|c| c.parent.clone());
1346 }
1347 current
1348 }
1349 }
1350 }
1351
1352 /// Remove all listeners for this chain
1353 /// Based on Gun.js chain.off() - properly removes listeners
1354 pub fn off(&self) -> Arc<Chain> {
1355 let listener_ids = self.listener_ids.lock();
1356 let ids: Vec<u64> = listener_ids.iter().cloned().collect();
1357 drop(listener_ids); // Release lock
1358
1359 // Resolve soul the same way on() does
1360 let resolved_soul = if let Some(ref s) = &self.soul {
1361 s.clone()
1362 } else if let Some(ref k) = &self.key {
1363 // Try to resolve path by checking parent node
1364 if let Some(parent) = &self.parent {
1365 // Walk up the parent chain to find a node with a soul
1366 let mut current_parent: Option<&Chain> = Some(parent.as_ref());
1367 let mut found_parent_soul: Option<String> = None;
1368
1369 while let Some(p) = current_parent {
1370 if let Some(ps) = &p.soul {
1371 found_parent_soul = Some(ps.clone());
1372 break;
1373 }
1374 current_parent = p.parent.as_deref();
1375 }
1376
1377 if let Some(parent_soul) = found_parent_soul {
1378 parent_soul
1379 } else {
1380 "graph_update".to_string()
1381 }
1382 } else {
1383 "graph_update".to_string()
1384 }
1385 } else {
1386 "graph_update".to_string()
1387 };
1388
1389 let event_type = if resolved_soul != "graph_update" {
1390 format!("node_update:{}", resolved_soul)
1391 } else {
1392 "graph_update".to_string()
1393 };
1394
1395 // Try removing from the resolved event type
1396 for id in ids.iter() {
1397 self.core.events.off(&event_type, *id);
1398 }
1399
1400 // Also try removing from graph_update as fallback (in case listener was registered with different type)
1401 if event_type != "graph_update" {
1402 for id in ids.iter() {
1403 self.core.events.off("graph_update", *id);
1404 }
1405 }
1406
1407 self.listener_ids.lock().clear();
1408 Arc::new(self.clone())
1409 }
1410}
1411
1412impl Clone for Chain {
1413 fn clone(&self) -> Self {
1414 Self {
1415 core: self.core.clone(),
1416 soul: self.soul.clone(),
1417 key: self.key.clone(),
1418 parent: self.parent.clone(),
1419 id: self.id,
1420 listener_ids: self.listener_ids.clone(),
1421 }
1422 }
1423}