package common
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"canmi.net/vane-mock-tests/pkg/config/advanced"
"canmi.net/vane-mock-tests/pkg/env"
"canmi.net/vane-mock-tests/pkg/term"
)
const executorPluginSrc = `
package main
import (
"os"
"time"
)
func main() {
if os.Getenv("MODE") == "sleep" {
time.Sleep(3 * time.Second)
}
if os.Getenv("MODE") == "fail" {
os.Stderr.WriteString("intentional failure")
os.Exit(1)
}
// Success branch output
os.Stdout.Write([]byte("{\"branch\":\"success\"}"))
}
`
func TestFlowTimeout(ctx context.Context, s *env.Sandbox) error {
debug, _ := ctx.Value(env.DebugKey).(bool)
bin, err := s.CompileGoBin(executorPluginSrc)
if err != nil {
return err
}
s.Env["FLOW_EXECUTION_TIMEOUT_SECS"] = "1"
token := "timeout-token-long-enough-1234"
s.Env["ACCESS_TOKEN"] = token
proc, err := s.StartVane(ctx, debug)
if err != nil {
return err
}
defer proc.Stop()
if err := registerExecutorPlugin(s.ConsolePort, token, bin, "sleep"); err != nil {
return err
}
ports, _ := env.GetFreePorts(1)
vanePort := ports[0]
if err := writeExecutorFlow(s, vanePort); err != nil {
return err
}
if err := proc.WaitForTcpPort(vanePort, 5*time.Second); err != nil {
return err
}
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/", vanePort))
if err != nil {
return nil
}
defer resp.Body.Close()
if resp.StatusCode < 500 {
return term.FormatFailure(fmt.Sprintf("Expected timeout error, got status %d", resp.StatusCode), nil)
}
if err := proc.WaitForLog("Flow execution timed out", 2*time.Second); err != nil {
return term.FormatFailure("Vane did not log timeout message", term.NewNode(proc.DumpLogs()))
}
return nil
}
func TestExternalCircuitBreaker(ctx context.Context, s *env.Sandbox) error {
debug, _ := ctx.Value(env.DebugKey).(bool)
bin, err := s.CompileGoBin(executorPluginSrc)
if err != nil {
return err
}
s.Env["EXTERNAL_PLUGIN_QUIET_PERIOD_SECS"] = "2"
token := "cb-token-long-enough-123456789"
s.Env["ACCESS_TOKEN"] = token
proc, err := s.StartVane(ctx, debug)
if err != nil {
return err
}
defer proc.Stop()
if err := registerExecutorPlugin(s.ConsolePort, token, bin, "fail"); err != nil {
return err
}
ports, _ := env.GetFreePorts(1)
vanePort := ports[0]
if err := writeExecutorFlow(s, vanePort); err != nil {
return err
}
proc.WaitForTcpPort(vanePort, 5*time.Second)
client := &http.Client{Timeout: 2 * time.Second}
url := fmt.Sprintf("http://127.0.0.1:%d/", vanePort)
resp1, _ := client.Get(url)
if resp1 != nil {
resp1.Body.Close()
}
if err := proc.WaitForLog("Marking as failed in Circuit Breaker", 2*time.Second); err != nil {
return term.FormatFailure("Vane did not activate circuit breaker", term.NewNode(proc.DumpLogs()))
}
resp2, err := client.Get(url)
if err != nil {
return term.FormatFailure("Circuit breaker request failed", term.NewNode(err.Error()))
}
defer resp2.Body.Close()
if resp2.StatusCode != 503 {
return term.FormatFailure(fmt.Sprintf("Circuit breaker did not return 503, got %d", resp2.StatusCode), nil)
}
body, _ := io.ReadAll(resp2.Body)
if !bytes.Contains(body, []byte("Circuit Breaker Active")) {
return term.FormatFailure("Wrong response body for circuit breaker", term.NewNode(string(body)))
}
time.Sleep(2500 * time.Millisecond)
resp3, _ := client.Get(url)
if resp3 != nil {
resp3.Body.Close()
}
if bytes.Contains([]byte(proc.DumpLogs()), []byte("is in quiet period")) {
}
return nil
}
func registerExecutorPlugin(consolePort int, token, bin, mode string) error {
url := fmt.Sprintf("http://127.0.0.1:%d/plugins/executor.tester", consolePort)
payload := map[string]interface{}{
"name": "executor.tester",
"role": "middleware",
"driver": map[string]interface{}{
"type": "command",
"program": bin,
"env": map[string]string{"MODE": mode},
},
"output": []string{"success", "failure"},
}
jb, _ := json.Marshal(payload)
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(jb))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{}).Do(req)
if err != nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
return fmt.Errorf("plugin registration failed")
}
resp.Body.Close()
return nil
}
func writeExecutorFlow(s *env.Sandbox, vanePort int) error {
l7Conf := advanced.ApplicationConfig{
Pipeline: advanced.ProcessingStep{
"executor.tester": advanced.PluginInstance{
Input: map[string]interface{}{},
Output: map[string]advanced.ProcessingStep{
"success": {
"internal.terminator.response": advanced.PluginInstance{
Input: map[string]interface{}{"status": 200, "body": "OK"},
},
},
"failure": {
"internal.terminator.response": advanced.PluginInstance{
Input: map[string]interface{}{"status": 503, "body": "Circuit Breaker Active"},
},
},
},
},
},
}
l7Bytes, _ := json.Marshal(l7Conf)
s.WriteConfig("application/httpx.json", l7Bytes)
l4pConf := advanced.L4FlowConfig{Connection: advanced.NewUpgrade("httpx")}
l4pBytes, _ := json.Marshal(l4pConf)
s.WriteConfig("resolver/http.json", l4pBytes)
l4Conf := advanced.L4FlowConfig{Connection: advanced.NewUpgrade("http")}
l4Bytes, _ := json.Marshal(l4Conf)
return s.WriteConfig(fmt.Sprintf("listener/[%d]/tcp.json", vanePort), l4Bytes)
}