package main
import (
"bufio"
"context"
"fmt"
"io"
"log"
"net"
"time"
"github.com/reactivex/rxgo/v2"
)
func main() {
ln, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
log.Fatalln(err)
}
fmt.Println("listening on", ln.Addr())
for id, observable := 0, generator(); ; id++ {
if conn, err := ln.Accept(); err == nil {
fmt.Printf("client[%d] connected from %s\n", id, conn.RemoteAddr())
go readclose(conn, id)
go subscribe(conn, observable)
}
}
}
func readclose(conn io.ReadCloser, id int) {
defer func() {
fmt.Printf("client[%d] disconnected: read: EOF\n", id)
conn.Close()
}()
scanner := bufio.NewScanner(conn)
for scanner.Scan() {
line := scanner.Text()
fmt.Printf("client[%d]: %s\n", id, line)
}
}
func subscribe(conn io.Writer, observable rxgo.Observable) {
observable.DoOnNext(func(i interface{}) {
fmt.Fprintf(conn, "%v\n", i)
})
}
func generator() rxgo.Observable {
ch := make(chan rxgo.Item)
go func() {
for {
ch <- rxgo.Of(time.Now())
time.Sleep(time.Second)
}
close(ch)
}()
observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())
observable.Connect(context.Background())
return observable
}