ruchy 4.2.0

A systems scripting language that transpiles to idiomatic Rust with extreme quality engineering
Documentation
// 36_reactive_programming.ruchy - Reactive programming and event streams

import std::reactive
import std::events
import std::time

fn main() {
    println("=== Reactive Programming ===\n")

    // Observable streams
    println("=== Observable Streams ===")

    // Create an observable from values
    let numbers = Observable::from([1, 2, 3, 4, 5])

    // Subscribe to the observable
    numbers.subscribe(
        value => println(f"Received: {value}"),
        error => println(f"Error: {error}"),
        () => println("Complete!")
    )

    // Create an observable from events
    let clicks = Observable::from_event("click")

    // Transform the stream
    let double_clicks = clicks
        .buffer_time(500)  // Collect clicks within 500ms
        .filter(clicks => clicks.len() == 2)
        .map(_ => "Double click detected!")

    double_clicks.subscribe(msg => println(msg))

    // Subject - both observer and observable
    println("\n=== Subject ===")

    let subject = Subject::new()

    // Subscribe multiple observers
    subject.subscribe(x => println(f"Observer 1: {x}"))
    subject.subscribe(x => println(f"Observer 2: {x * 2}"))

    // Emit values
    subject.next(1)
    subject.next(2)
    subject.next(3)
    subject.complete()

    // BehaviorSubject - remembers last value
    println("\n=== BehaviorSubject ===")

    let behavior_subject = BehaviorSubject::new(0)

    behavior_subject.subscribe(x => println(f"Early subscriber: {x}"))
    behavior_subject.next(1)
    behavior_subject.next(2)

    // Late subscriber gets the last value immediately
    behavior_subject.subscribe(x => println(f"Late subscriber: {x}"))
    behavior_subject.next(3)

    // ReplaySubject - remembers multiple values
    println("\n=== ReplaySubject ===")

    let replay_subject = ReplaySubject::new(buffer_size: 3)

    replay_subject.next("A")
    replay_subject.next("B")
    replay_subject.next("C")
    replay_subject.next("D")

    // New subscriber gets last 3 values
    replay_subject.subscribe(x => println(f"Replayed: {x}"))

    // Stream operators
    println("\n=== Stream Operators ===")

    // Map operator
    Observable::interval(1000)
        .map(x => x * 2)
        .take(5)
        .subscribe(x => println(f"Doubled: {x}"))

    // Filter operator
    Observable::range(1, 10)
        .filter(x => x % 2 == 0)
        .subscribe(x => println(f"Even: {x}"))

    // FlatMap operator
    Observable::from(["hello", "world"])
        .flat_map(word => Observable::from(word.chars()))
        .subscribe(char => println(f"Char: {char}"))

    // Scan operator (accumulator)
    Observable::from([1, 2, 3, 4, 5])
        .scan(0, (acc, val) => acc + val)
        .subscribe(sum => println(f"Running sum: {sum}"))

    // Debounce operator
    let search_input = Observable::from_event("input")
        .debounce_time(300)  // Wait 300ms after last event
        .distinct_until_changed()
        .switch_map(query => search_api(query))

    // Throttle operator
    let scroll_events = Observable::from_event("scroll")
        .throttle_time(100)  // Emit at most once per 100ms
        .map(_ => get_scroll_position())

    // Combining streams
    println("\n=== Combining Streams ===")

    // Merge - combine multiple streams
    let stream1 = Observable::interval(1000).map(x => f"Stream1: {x}")
    let stream2 = Observable::interval(1500).map(x => f"Stream2: {x}")

    Observable::merge(stream1, stream2)
        .take(6)
        .subscribe(x => println(x))

    // Zip - combine latest values from each stream
    let names = Observable::from(["Alice", "Bob", "Charlie"])
    let ages = Observable::from([25, 30, 35])

    Observable::zip(names, ages, (name, age) => f"{name} is {age} years old")
        .subscribe(x => println(x))

    // CombineLatest - emit when any stream emits
    let temperature = BehaviorSubject::new(20)
    let humidity = BehaviorSubject::new(50)

    Observable::combine_latest(
        temperature,
        humidity,
        (t, h) => f"Temp: {t}°C, Humidity: {h}%"
    ).subscribe(x => println(x))

    temperature.next(22)
    humidity.next(55)

    // Error handling
    println("\n=== Error Handling ===")

    Observable::from([1, 2, 0, 4])
        .map(x => 10 / x)
        .catch_error(err => Observable::of(-1))
        .subscribe(x => println(f"Result: {x}"))

    // Retry on error
    let flaky_api = Observable::defer(() => {
        if random() < 0.5 {
            Observable::throw("Network error")
        } else {
            Observable::of("Success!")
        }
    })

    flaky_api
        .retry(3)  // Retry up to 3 times
        .subscribe(
            x => println(f"API result: {x}"),
            err => println(f"API failed after retries: {err}")
        )

    // Backpressure handling
    println("\n=== Backpressure ===")

    // Buffer overflow strategies
    let fast_producer = Observable::interval(10)
    let slow_consumer = fast_producer
        .on_backpressure_buffer(100)  // Buffer up to 100 items
        .observe_on(Scheduler::computation())
        .subscribe(x => {
            sleep_ms(50)  // Slow processing
            println(f"Processed: {x}")
        })

    // Drop strategy
    fast_producer
        .on_backpressure_drop()  // Drop items if consumer can't keep up
        .subscribe(x => process_slowly(x))

    // Latest strategy
    fast_producer
        .on_backpressure_latest()  // Keep only latest item
        .subscribe(x => process_slowly(x))

    // Custom operators
    println("\n=== Custom Operators ===")

    fn moving_average(window_size: int) {
        |source: Observable<float>| {
            source.scan(
                { sum: 0.0, count: 0, values: [] },
                |acc, val| {
                    let mut values = acc.values
                    values.append(val)

                    if values.len() > window_size {
                        values = values[1..]
                    }

                    {
                        sum: values.sum(),
                        count: values.len(),
                        values: values
                    }
                }
            ).map(state => state.sum / state.count)
        }
    }

    Observable::from([1.0, 2.0, 3.0, 4.0, 5.0, 6.0])
        .pipe(moving_average(3))
        .subscribe(avg => println(f"Moving average: {avg:.2}"))

    // Hot vs Cold observables
    println("\n=== Hot vs Cold Observables ===")

    // Cold observable - new execution for each subscriber
    let cold = Observable::create(observer => {
        println("Cold observable executing")
        observer.next(random())
        observer.complete()
    })

    cold.subscribe(x => println(f"Subscriber 1: {x}"))
    cold.subscribe(x => println(f"Subscriber 2: {x}"))  // Different value

    // Hot observable - shared execution
    let hot = Observable::interval(1000).publish()

    hot.subscribe(x => println(f"Hot subscriber 1: {x}"))
    sleep_ms(2500)
    hot.subscribe(x => println(f"Hot subscriber 2: {x}"))  // Misses first values

    hot.connect()  // Start emitting

    // Schedulers
    println("\n=== Schedulers ===")

    Observable::from([1, 2, 3])
        .observe_on(Scheduler::async())  // Process on async scheduler
        .map(x => heavy_computation(x))
        .subscribe_on(Scheduler::io())   // Subscribe on IO scheduler
        .subscribe(x => println(f"Computed: {x}"))

    // Event bus
    println("\n=== Event Bus ===")

    struct EventBus {
        subjects: map<string, Subject> = {}
    }

    impl EventBus {
        fn emit(mut self, event_type: string, data: any) {
            if event_type not in self.subjects {
                self.subjects[event_type] = Subject::new()
            }
            self.subjects[event_type].next(data)
        }

        fn on(mut self, event_type: string) -> Observable {
            if event_type not in self.subjects {
                self.subjects[event_type] = Subject::new()
            }
            self.subjects[event_type].as_observable()
        }
    }

    let event_bus = EventBus {}

    event_bus.on("user_login")
        .subscribe(user => println(f"User logged in: {user.name}"))

    event_bus.on("user_logout")
        .subscribe(user => println(f"User logged out: {user.name}"))

    event_bus.emit("user_login", { name: "Alice", id: 1 })
    event_bus.emit("user_logout", { name: "Alice", id: 1 })

    // State management with observables
    println("\n=== State Management ===")

    struct Store<State> {
        state: BehaviorSubject<State>
    }

    impl<State> Store<State> {
        fn new(initial: State) -> Store<State> {
            Store { state: BehaviorSubject::new(initial) }
        }

        fn select<T>(self, selector: fn(State) -> T) -> Observable<T> {
            self.state.map(selector).distinct_until_changed()
        }

        fn dispatch(self, action: Action) {
            let current = self.state.get_value()
            let next = reducer(current, action)
            self.state.next(next)
        }
    }

    // Usage
    let store = Store::new({ count: 0, user: None })

    store.select(state => state.count)
        .subscribe(count => println(f"Count changed: {count}"))

    store.dispatch(Increment)
    store.dispatch(Increment)
    store.dispatch(Decrement)
}