use rx_rs::prelude::*;
use std::cell::RefCell;
use std::rc::Rc;
#[test]
fn test_self_referencing_flatmap_creates_cycle() {
let counter = RxRef::new(1);
let initial_subs = counter.subscriber_count();
assert_eq!(initial_subs, 0);
let counter_clone = counter.clone();
let flattened = counter.val().flat_map(move |_| {
counter_clone.val()
});
assert_eq!(flattened.get(), 1);
let subs_during = counter.subscriber_count();
println!("Subscribers during flat_map: {}", subs_during);
assert!(
subs_during > initial_subs,
"Expected subscribers to be created"
);
drop(flattened);
let subs_after = counter.subscriber_count();
println!("Subscribers after drop: {}", subs_after);
if subs_after > initial_subs {
println!("POTENTIAL LEAK DETECTED!");
println!(
"Initial: {}, During: {}, After drop: {}",
initial_subs, subs_during, subs_after
);
println!("Subscribers not cleaned up properly.");
println!("This indicates a reference cycle.");
}
}
#[test]
fn test_self_flatmap_leak_with_subscriber_count() {
let counter = RxRef::new(1);
assert_eq!(counter.subscriber_count(), 0);
let counter_clone = counter.clone();
{
let flattened = counter.val().flat_map(move |_| counter_clone.val());
let _ = flattened.get();
let sub_count = counter.subscriber_count();
println!("Subscribers during flat_map: {}", sub_count);
assert!(sub_count > 0, "Expected subscribers to be created");
}
let final_count = counter.subscriber_count();
println!("Subscribers after drop: {}", final_count);
}
#[test]
fn test_circular_flatmap_chain() {
let a = RxRef::new(1);
let b = RxRef::new(2);
let b_clone = b.clone();
let a_clone = a.clone();
let a_flat = a.val().flat_map(move |_| b_clone.val());
let b_flat = b.val().flat_map(move |_| a_clone.val());
assert_eq!(a_flat.get(), 2);
assert_eq!(b_flat.get(), 1);
let a_subs_before = a.subscriber_count();
let b_subs_before = b.subscriber_count();
println!(
"A subscribers: {}, B subscribers: {}",
a_subs_before, b_subs_before
);
drop(a_flat);
drop(b_flat);
let a_subs_after = a.subscriber_count();
let b_subs_after = b.subscriber_count();
println!(
"After drop - A subscribers: {}, B subscribers: {}",
a_subs_after, b_subs_after
);
if a_subs_after > 0 || b_subs_after > 0 {
println!("POTENTIAL CIRCULAR REFERENCE LEAK DETECTED!");
println!("Subscribers not fully cleaned up after dropping flat_mapped values");
}
}
#[test]
fn test_flatmap_observable_self_subscribe() {
let tracker = DisposableTracker::new();
let source = RxRef::new(1);
let source_clone = source.clone();
let flattened = source.val().flat_map_observable(move |_| {
source_clone.stream()
});
let values = Rc::new(RefCell::new(Vec::new()));
let values_clone = values.clone();
flattened.subscribe(tracker.tracker(), move |val| {
values_clone.borrow_mut().push(*val);
});
let subs = source.subscriber_count();
println!("Source subscribers after flat_map_observable: {}", subs);
source.set(2);
source.set(3);
println!("Values emitted: {:?}", values.borrow());
println!("Final subscriber count: {}", source.subscriber_count());
}