package app
import (
"context"
"fmt"
"io"
"log/slog"
"os"
"sync"
"time"
"wrpc.io/examples/go/streams-client/bindings/wrpc_examples/streams/handler"
wrpc "wrpc.io/go"
)
func init() {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelInfo, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey {
return slog.Attr{}
}
return a
},
})))
}
type ThrottleStream[T any] struct {
tick *time.Ticker
values []T
}
func (s *ThrottleStream[T]) Receive() ([]T, error) {
if len(s.values) == 0 {
return nil, io.EOF
}
<-s.tick.C
var v T
v, s.values = s.values[0], s.values[1:]
return []T{v}, nil
}
func (s *ThrottleStream[T]) Close() error {
s.tick.Stop()
return nil
}
type ThrottleReader struct {
tick *time.Ticker
buf []byte
}
func (s *ThrottleReader) Read(p []byte) (int, error) {
if len(s.buf) == 0 {
return 0, io.EOF
}
<-s.tick.C
var b byte
b, s.buf = s.buf[0], s.buf[1:]
p[0] = b
return 1, nil
}
func (s *ThrottleReader) Close() error {
s.tick.Stop()
return nil
}
func Run(prefix string, client wrpc.Invoker) error {
numbers, bytes, errCh, err := handler.Echo(context.Background(), client, &handler.Req{
Numbers: &ThrottleStream[uint64]{
tick: time.NewTicker(time.Second),
values: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
Bytes: &ThrottleReader{
tick: time.NewTicker(time.Second),
buf: []byte("foo bar baz"),
},
})
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:streams/handler.echo`: %w", err)
}
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
defer numbers.Close()
for {
chunk, err := numbers.Receive()
if err == io.EOF {
return
}
if err != nil {
slog.Error("failed to receive number chunk", "err", err)
return
}
fmt.Printf("%s numbers: %v\n", prefix, chunk)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer bytes.Close()
var chunk [128]byte
for {
n, err := bytes.Read(chunk[:])
if err == io.EOF {
return
}
if err != nil {
slog.Error("failed to receive byte chunk", "err", err)
return
}
fmt.Printf("%s bytes: %s\n", prefix, chunk[:n])
}
}()
for err := range errCh {
slog.Error("failed to transmit async parameters", "err", err)
}
return nil
}