package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/rustqueue/rustqueue-go/rustqueue"
)
func main() {
client := rustqueue.NewTcpClient("127.0.0.1", 6789,
rustqueue.WithAutoReconnect(true),
rustqueue.WithMaxReconnectAttempts(20),
rustqueue.WithReconnectDelay(500),
)
if err := client.Connect(); err != nil {
log.Fatalf("failed to connect: %v", err)
}
defer client.Disconnect()
fmt.Println("Connected to RustQueue TCP server")
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case <-sigCh:
fmt.Println("\nShutting down worker...")
return
default:
}
jobs, err := client.Pull("emails", 1)
if err != nil {
log.Printf("pull error: %v", err)
time.Sleep(time.Second)
continue
}
if len(jobs) == 0 {
time.Sleep(500 * time.Millisecond)
continue
}
job := jobs[0]
fmt.Printf("Processing job %s (%s)\n", job.ID, job.Name)
for i := 1; i <= 5; i++ {
time.Sleep(200 * time.Millisecond)
if err := client.Progress(job.ID, i*20, fmt.Sprintf("step %d/5", i)); err != nil {
log.Printf("progress error: %v", err)
}
if err := client.Heartbeat(job.ID); err != nil {
log.Printf("heartbeat error: %v", err)
}
}
if err := client.Ack(job.ID, map[string]string{"status": "processed"}); err != nil {
log.Printf("ack error: %v", err)
if _, failErr := client.Fail(job.ID, fmt.Sprintf("ack failed: %v", err)); failErr != nil {
log.Printf("fail error: %v", failErr)
}
continue
}
fmt.Printf("Completed job %s\n", job.ID)
}
}