package main
import (
"context"
"fmt"
"log"
"time"
"github.com/reactivex/rxgo/v2"
)
func hello() {
observable := rxgo.Just("Hello, World!")()
ch := observable.Observe()
item := <-ch
fmt.Println(item.V)
}
func hot() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
func cold() {
observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
}})
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
func connectable() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 100; i++ {
ch <- rxgo.Of(i)
time.Sleep(time.Second)
}
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
disposed, cancel := observable.Connect(context.Background())
go func() {
time.Sleep(3 * time.Second)
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Third observer: %d\n", i)
})
}()
_ = disposed
_ = cancel
}
func main() {
log.Println("Hello")
hello()
log.Println("Hot")
hot()
log.Println("Cold")
cold()
log.Println("Connectable")
connectable()
select {}
}