// 36_reactive_programming.ruchy - Reactive programming and event streams
import std::reactive
import std::events
import std::time
fn main() {
println("=== Reactive Programming ===\n")
// Observable streams
println("=== Observable Streams ===")
// Create an observable from values
let numbers = Observable::from([1, 2, 3, 4, 5])
// Subscribe to the observable
numbers.subscribe(
value => println(f"Received: {value}"),
error => println(f"Error: {error}"),
() => println("Complete!")
)
// Create an observable from events
let clicks = Observable::from_event("click")
// Transform the stream
let double_clicks = clicks
.buffer_time(500) // Collect clicks within 500ms
.filter(clicks => clicks.len() == 2)
.map(_ => "Double click detected!")
double_clicks.subscribe(msg => println(msg))
// Subject - both observer and observable
println("\n=== Subject ===")
let subject = Subject::new()
// Subscribe multiple observers
subject.subscribe(x => println(f"Observer 1: {x}"))
subject.subscribe(x => println(f"Observer 2: {x * 2}"))
// Emit values
subject.next(1)
subject.next(2)
subject.next(3)
subject.complete()
// BehaviorSubject - remembers last value
println("\n=== BehaviorSubject ===")
let behavior_subject = BehaviorSubject::new(0)
behavior_subject.subscribe(x => println(f"Early subscriber: {x}"))
behavior_subject.next(1)
behavior_subject.next(2)
// Late subscriber gets the last value immediately
behavior_subject.subscribe(x => println(f"Late subscriber: {x}"))
behavior_subject.next(3)
// ReplaySubject - remembers multiple values
println("\n=== ReplaySubject ===")
let replay_subject = ReplaySubject::new(buffer_size: 3)
replay_subject.next("A")
replay_subject.next("B")
replay_subject.next("C")
replay_subject.next("D")
// New subscriber gets last 3 values
replay_subject.subscribe(x => println(f"Replayed: {x}"))
// Stream operators
println("\n=== Stream Operators ===")
// Map operator
Observable::interval(1000)
.map(x => x * 2)
.take(5)
.subscribe(x => println(f"Doubled: {x}"))
// Filter operator
Observable::range(1, 10)
.filter(x => x % 2 == 0)
.subscribe(x => println(f"Even: {x}"))
// FlatMap operator
Observable::from(["hello", "world"])
.flat_map(word => Observable::from(word.chars()))
.subscribe(char => println(f"Char: {char}"))
// Scan operator (accumulator)
Observable::from([1, 2, 3, 4, 5])
.scan(0, (acc, val) => acc + val)
.subscribe(sum => println(f"Running sum: {sum}"))
// Debounce operator
let search_input = Observable::from_event("input")
.debounce_time(300) // Wait 300ms after last event
.distinct_until_changed()
.switch_map(query => search_api(query))
// Throttle operator
let scroll_events = Observable::from_event("scroll")
.throttle_time(100) // Emit at most once per 100ms
.map(_ => get_scroll_position())
// Combining streams
println("\n=== Combining Streams ===")
// Merge - combine multiple streams
let stream1 = Observable::interval(1000).map(x => f"Stream1: {x}")
let stream2 = Observable::interval(1500).map(x => f"Stream2: {x}")
Observable::merge(stream1, stream2)
.take(6)
.subscribe(x => println(x))
// Zip - combine latest values from each stream
let names = Observable::from(["Alice", "Bob", "Charlie"])
let ages = Observable::from([25, 30, 35])
Observable::zip(names, ages, (name, age) => f"{name} is {age} years old")
.subscribe(x => println(x))
// CombineLatest - emit when any stream emits
let temperature = BehaviorSubject::new(20)
let humidity = BehaviorSubject::new(50)
Observable::combine_latest(
temperature,
humidity,
(t, h) => f"Temp: {t}°C, Humidity: {h}%"
).subscribe(x => println(x))
temperature.next(22)
humidity.next(55)
// Error handling
println("\n=== Error Handling ===")
Observable::from([1, 2, 0, 4])
.map(x => 10 / x)
.catch_error(err => Observable::of(-1))
.subscribe(x => println(f"Result: {x}"))
// Retry on error
let flaky_api = Observable::defer(() => {
if random() < 0.5 {
Observable::throw("Network error")
} else {
Observable::of("Success!")
}
})
flaky_api
.retry(3) // Retry up to 3 times
.subscribe(
x => println(f"API result: {x}"),
err => println(f"API failed after retries: {err}")
)
// Backpressure handling
println("\n=== Backpressure ===")
// Buffer overflow strategies
let fast_producer = Observable::interval(10)
let slow_consumer = fast_producer
.on_backpressure_buffer(100) // Buffer up to 100 items
.observe_on(Scheduler::computation())
.subscribe(x => {
sleep_ms(50) // Slow processing
println(f"Processed: {x}")
})
// Drop strategy
fast_producer
.on_backpressure_drop() // Drop items if consumer can't keep up
.subscribe(x => process_slowly(x))
// Latest strategy
fast_producer
.on_backpressure_latest() // Keep only latest item
.subscribe(x => process_slowly(x))
// Custom operators
println("\n=== Custom Operators ===")
fn moving_average(window_size: int) {
|source: Observable<float>| {
source.scan(
{ sum: 0.0, count: 0, values: [] },
|acc, val| {
let mut values = acc.values
values.append(val)
if values.len() > window_size {
values = values[1..]
}
{
sum: values.sum(),
count: values.len(),
values: values
}
}
).map(state => state.sum / state.count)
}
}
Observable::from([1.0, 2.0, 3.0, 4.0, 5.0, 6.0])
.pipe(moving_average(3))
.subscribe(avg => println(f"Moving average: {avg:.2}"))
// Hot vs Cold observables
println("\n=== Hot vs Cold Observables ===")
// Cold observable - new execution for each subscriber
let cold = Observable::create(observer => {
println("Cold observable executing")
observer.next(random())
observer.complete()
})
cold.subscribe(x => println(f"Subscriber 1: {x}"))
cold.subscribe(x => println(f"Subscriber 2: {x}")) // Different value
// Hot observable - shared execution
let hot = Observable::interval(1000).publish()
hot.subscribe(x => println(f"Hot subscriber 1: {x}"))
sleep_ms(2500)
hot.subscribe(x => println(f"Hot subscriber 2: {x}")) // Misses first values
hot.connect() // Start emitting
// Schedulers
println("\n=== Schedulers ===")
Observable::from([1, 2, 3])
.observe_on(Scheduler::async()) // Process on async scheduler
.map(x => heavy_computation(x))
.subscribe_on(Scheduler::io()) // Subscribe on IO scheduler
.subscribe(x => println(f"Computed: {x}"))
// Event bus
println("\n=== Event Bus ===")
struct EventBus {
subjects: map<string, Subject> = {}
}
impl EventBus {
fn emit(mut self, event_type: string, data: any) {
if event_type not in self.subjects {
self.subjects[event_type] = Subject::new()
}
self.subjects[event_type].next(data)
}
fn on(mut self, event_type: string) -> Observable {
if event_type not in self.subjects {
self.subjects[event_type] = Subject::new()
}
self.subjects[event_type].as_observable()
}
}
let event_bus = EventBus {}
event_bus.on("user_login")
.subscribe(user => println(f"User logged in: {user.name}"))
event_bus.on("user_logout")
.subscribe(user => println(f"User logged out: {user.name}"))
event_bus.emit("user_login", { name: "Alice", id: 1 })
event_bus.emit("user_logout", { name: "Alice", id: 1 })
// State management with observables
println("\n=== State Management ===")
struct Store<State> {
state: BehaviorSubject<State>
}
impl<State> Store<State> {
fn new(initial: State) -> Store<State> {
Store { state: BehaviorSubject::new(initial) }
}
fn select<T>(self, selector: fn(State) -> T) -> Observable<T> {
self.state.map(selector).distinct_until_changed()
}
fn dispatch(self, action: Action) {
let current = self.state.get_value()
let next = reducer(current, action)
self.state.next(next)
}
}
// Usage
let store = Store::new({ count: 0, user: None })
store.select(state => state.count)
.subscribe(count => println(f"Count changed: {count}"))
store.dispatch(Increment)
store.dispatch(Increment)
store.dispatch(Decrement)
}