import { filter_nil } from "std/collections"
import { merge, safe_parse } from "std/json"
fn __strip_signature_prefix(signature) {
let trimmed = trim(signature ?? "")
if starts_with(trimmed, "sha256=") {
return split(trimmed, "sha256=")[1]
}
if starts_with(trimmed, "sha1=") {
return split(trimmed, "sha1=")[1]
}
return trimmed
}
/**
* verify_hmac_signature.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: verify_hmac_signature(body, signature, secret, algorithm)
*/
pub fn verify_hmac_signature(body, signature, secret, algorithm = "sha256") {
let alg = lowercase(trim(algorithm ?? "sha256"))
let provided = __strip_signature_prefix(signature)
if alg == "sha1" || alg == "hmac-sha1" {
return constant_time_eq(hmac_sha1(secret, body), provided)
}
if alg != "sha256" && alg != "hmac-sha256" {
return false
}
let expected = hmac_sha256(secret, body)
return constant_time_eq(expected, provided)
}
/**
* verify_jwt.
*
* @effects: [net]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: verify_jwt(token, jwks_url, options)
*/
pub fn verify_jwt(token, jwks_url, options = nil) {
let opts = options ?? {}
var jwks = opts.inline_jwks
if jwks == nil {
if jwks_url == nil || trim(jwks_url) == "" {
return {ok: false, claims: nil, error: "jwks_url is required unless options.inline_jwks is provided"}
}
let response = harness.net.get(jwks_url, {headers: {Accept: "application/json"}})
if !(response.ok ?? false) {
return {ok: false, claims: nil, error: response.body, status: response.status}
}
jwks = safe_parse(response.body)
if jwks == nil {
return {ok: false, claims: nil, error: "JWKS response was not valid JSON", status: response.status}
}
}
return connector_shared_verify_jwt_inline(token, jwks, opts)
}
fn __form_body(params) {
var parts = []
for entry in params {
parts = parts + [url_encode(entry.key) + "=" + url_encode(to_string(entry.value))]
}
return join(parts, "&")
}
fn __connector_http_pad2(value) {
let text = to_string(to_int(value) ?? 0)
if len(text) == 1 {
return "0" + text
}
return text
}
fn __connector_http_month_number(name) {
let months = {
apr: "04",
aug: "08",
dec: "12",
feb: "02",
jan: "01",
jul: "07",
jun: "06",
mar: "03",
may: "05",
nov: "11",
oct: "10",
sep: "09",
}
return months[lowercase(name)]
}
fn __connector_http_parse_http_date_ms(raw) {
let matches = regex_captures(
"^[A-Za-z]{3},\\s+(\\d{1,2})\\s+([A-Za-z]{3})\\s+(\\d{4})\\s+(\\d{2}):(\\d{2}):(\\d{2})\\s+GMT$",
raw,
)
if len(matches) == 0 {
return nil
}
let parts = matches[0].groups
let month = __connector_http_month_number(parts[1])
if month == nil {
return nil
}
let iso = parts[2] + "-" + month + "-" + __connector_http_pad2(parts[0])
+ "T"
+ parts[3]
+ ":"
+ parts[4]
+ ":"
+ parts[5]
+ "Z"
let parsed = try {
date_parse(iso)
}
if is_err(parsed) {
return nil
}
let delta_seconds = to_float(unwrap(parsed)) - harness.clock.timestamp()
if delta_seconds <= 0.0 {
return 0
}
return to_int(delta_seconds * 1000.0)
}
fn __connector_http_parse_retry_after_ms(value) {
let raw = trim(to_string(value ?? ""))
if raw == "" {
return nil
}
let seconds = to_float(raw)
if seconds != nil {
if seconds <= 0.0 {
return 0
}
return to_int(seconds * 1000.0)
}
let parsed = try {
date_parse(raw)
}
if is_ok(parsed) {
let delta_seconds = to_float(unwrap(parsed)) - harness.clock.timestamp()
if delta_seconds <= 0.0 {
return 0
}
return to_int(delta_seconds * 1000.0)
}
return __connector_http_parse_http_date_ms(raw)
}
fn __connector_http_retry_options(options) {
let opts = options ?? {}
let policy = opts.retry_policy ?? opts.retry ?? {}
var max_attempts = to_int(policy?.max_attempts ?? opts?.max_attempts)
if max_attempts == nil {
let low_level_retries = to_int(policy?.max)
if low_level_retries != nil {
max_attempts = low_level_retries + 1
} else {
max_attempts = 1
}
}
var base_ms = to_int(policy?.base_ms ?? policy?.backoff_ms ?? opts?.base_ms ?? opts?.backoff_ms ?? 200)
if base_ms == nil || base_ms < 0 {
base_ms = 0
}
var cap_ms = to_int(policy?.cap_ms ?? opts?.cap_ms ?? 30000)
if cap_ms == nil || cap_ms < 0 {
cap_ms = 0
}
if max_attempts == nil || max_attempts < 1 {
max_attempts = 1
}
return {
max_attempts: max_attempts,
base_ms: base_ms,
cap_ms: cap_ms,
respect_retry_after: opts?.respect_retry_after ?? true,
retry_on: opts?.retry_on ?? policy?.retry_on ?? [408, 429, 500, 502, 503, 504],
}
}
fn __connector_http_retry_delay_ms(policy, attempt) {
var delay = policy.base_ms
var i = 0
while i < attempt {
delay = delay * 2
i = i + 1
}
if delay > policy.cap_ms {
return policy.cap_ms
}
return delay
}
fn __connector_http_retryable_status(status, retry_on) {
for candidate in retry_on ?? [] {
if to_int(candidate) == status {
return true
}
}
return false
}
fn __connector_http_category(status) {
if status == 408 {
return "timeout"
}
if status == 429 {
return "rate_limit"
}
if status == 401 {
return "auth"
}
if status == 403 {
return "permission"
}
if status == 404 {
return "not_found"
}
if status == 409 {
return "conflict"
}
if status == 503 {
return "overloaded"
}
if status >= 500 {
return "server_error"
}
return "provider_error"
}
fn __connector_http_safe_method(method) {
let normalized = uppercase(trim(to_string(method ?? "")))
return contains(["GET", "HEAD", "PUT", "DELETE", "OPTIONS"], normalized)
}
fn __connector_http_has_idempotency_key(headers) {
let value = connector_http_header(headers ?? {}, "Idempotency-Key")
return value != nil && trim(to_string(value)) != ""
}
fn __connector_http_retry_allowed(method, headers, options) {
if __connector_http_safe_method(method) {
return true
}
let normalized = uppercase(trim(to_string(method ?? "")))
let retry_unsafe = options?.retry_unsafe ?? false
if normalized == "POST" || normalized == "PATCH" {
return __connector_http_has_idempotency_key(headers) || retry_unsafe
}
return retry_unsafe
}
fn __connector_http_headers(options) {
let opts = options ?? {}
var headers = {}
for entry in (opts.headers ?? {}).entries() {
headers[entry.key] = entry.value
}
let idempotency_key = opts.idempotency_key
if idempotency_key != nil && trim(to_string(idempotency_key)) != ""
&& !__connector_http_has_idempotency_key(headers) {
headers["Idempotency-Key"] = to_string(idempotency_key)
}
return headers
}
fn __connector_http_request_options(options, headers) {
let opts = options ?? {}
var out = {}
let internal = set(
[
"base_ms",
"backoff_ms",
"cap_ms",
"idempotency_key",
"max_attempts",
"operation",
"provider",
"redact_error_body",
"retry",
"retry_policy",
"retry_unsafe",
],
)
for entry in opts.entries() {
if !set_contains(internal, entry.key) {
out[entry.key] = entry.value
}
}
out["headers"] = headers
out["retry"] = {max: 0, backoff_ms: 0}
return out
}
fn __connector_http_error_body(response, options) {
if options?.redact_error_body ?? false {
return nil
}
return response?.body
}
fn __connector_http_has_fields(value) {
return type_of(value) == "dict" && len((value ?? {}).keys()) > 0
}
fn __connector_http_error_from_response(response, options, retryable) {
let status = to_int(response?.status) ?? 0
let rate_limit = connector_http_rate_limit(response)
var error = {category: __connector_http_category(status), status: status, retryable: retryable}
if rate_limit?.retry_after_ms != nil {
error["retry_after_ms"] = rate_limit.retry_after_ms
}
let body = __connector_http_error_body(response, options)
if body != nil {
error["body"] = body
}
if options?.provider != nil {
error["provider"] = options.provider
}
if options?.operation != nil {
error["operation"] = options.operation
}
if __connector_http_has_fields(rate_limit) {
error["rate_limit"] = rate_limit
}
return error
}
fn __connector_http_error_from_exception(error, options) {
let message = to_string(error)
let lowered = lowercase(message)
var category = "transient_network"
var retryable = true
if contains(lowered, "egress") {
category = "egress_blocked"
retryable = false
} else if contains(lowered, "invalid url")
|| contains(lowered, "url must start")
|| contains(lowered, "invalid method")
|| contains(lowered, "invalid header")
|| contains(lowered, "mutually exclusive") {
category = "invalid_request"
retryable = false
} else if contains(lowered, "max_response_bytes") {
category = "body_too_large"
retryable = false
} else if contains(lowered, "timeout") || contains(lowered, "timed out") {
category = "timeout"
}
return filter_nil(
{
category: category,
retryable: retryable,
message: message,
provider: options?.provider,
operation: options?.operation,
},
)
}
fn __connector_http_error_envelope(response, error) {
var envelope = {ok: false, error: error}
let status = response?.status ?? error?.status
if status != nil {
envelope["status"] = status
}
if response?.headers != nil {
envelope["headers"] = response.headers
}
if error?.body != nil {
envelope["body"] = error.body
}
if error?.category != nil {
envelope["category"] = error.category
}
if error?.retryable != nil {
envelope["retryable"] = error.retryable
}
if error?.retry_after_ms != nil {
envelope["retry_after_ms"] = error.retry_after_ms
}
if error?.provider != nil {
envelope["provider"] = error.provider
}
if error?.operation != nil {
envelope["operation"] = error.operation
}
if __connector_http_has_fields(error?.rate_limit) {
envelope["rate_limit"] = error.rate_limit
}
return envelope
}
fn __connector_http_success_envelope(response) {
let rate_limit = connector_http_rate_limit(response)
var envelope = {ok: true, status: response.status, headers: response.headers ?? {}, body: response.body ?? ""}
if response?.final_url != nil {
envelope["final_url"] = response.final_url
}
if rate_limit?.retry_after_ms != nil {
envelope["retry_after_ms"] = rate_limit.retry_after_ms
}
if __connector_http_has_fields(rate_limit) {
envelope["rate_limit"] = rate_limit
}
return envelope
}
fn __connector_http_should_retry_response(response, retry_policy, retry_allowed) {
if !retry_allowed {
return false
}
if response?.ok ?? false {
return false
}
let status = to_int(response?.status) ?? 0
return __connector_http_retryable_status(status, retry_policy.retry_on)
}
fn __connector_http_sleep_before_retry(retry_policy, attempt, retry_after_ms) {
var delay = __connector_http_retry_delay_ms(retry_policy, attempt)
if retry_policy.respect_retry_after && retry_after_ms != nil {
if retry_after_ms > retry_policy.cap_ms {
return false
}
if retry_after_ms > delay {
delay = retry_after_ms
}
}
if delay > 0 {
harness.clock.sleep_ms(delay)
}
return true
}
/**
* Case-insensitive HTTP header lookup for connector request and response
* envelopes.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: connector_http_header(response, "Retry-After")
*/
pub fn connector_http_header(headers_or_response, name) {
return http_header(headers_or_response ?? {}, name)
}
/**
* Extract the standard rate-limit headers connector packages commonly expose.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: connector_http_rate_limit(response)
*/
pub fn connector_http_rate_limit(headers_or_response) {
let retry_after = connector_http_header(headers_or_response, "Retry-After")
let limit = connector_http_header(headers_or_response, "RateLimit-Limit")
let remaining = connector_http_header(headers_or_response, "RateLimit-Remaining")
let reset = connector_http_header(headers_or_response, "RateLimit-Reset")
let x_limit = connector_http_header(headers_or_response, "X-RateLimit-Limit")
let x_remaining = connector_http_header(headers_or_response, "X-RateLimit-Remaining")
let x_reset = connector_http_header(headers_or_response, "X-RateLimit-Reset")
return filter_nil(
{
retry_after: retry_after,
retry_after_ms: __connector_http_parse_retry_after_ms(retry_after),
limit: limit ?? x_limit,
remaining: remaining ?? x_remaining,
reset: reset ?? x_reset,
ratelimit_limit: limit,
ratelimit_remaining: remaining,
ratelimit_reset: reset,
x_ratelimit_limit: x_limit,
x_ratelimit_remaining: x_remaining,
x_ratelimit_reset: x_reset,
},
)
}
/**
* connector_http_request.
*
* @effects: [net, time]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: connector_http_request("GET", url, {retry: {max_attempts: 3}})
*/
pub fn connector_http_request(method, url, options = nil) {
let opts = options ?? {}
let resolved_method = uppercase(trim(to_string(method ?? "GET")))
let headers = __connector_http_headers(opts)
let retry_policy = __connector_http_retry_options(opts)
let retry_allowed = __connector_http_retry_allowed(resolved_method, headers, opts)
let request_options = __connector_http_request_options(opts, headers)
var attempt = 0
while attempt < retry_policy.max_attempts {
let result = try {
harness.net.request(resolved_method, url, request_options)
}
if is_err(result) {
let error = __connector_http_error_from_exception(unwrap_err(result), opts)
let error_retryable = error.retryable ?? false
let can_retry = retry_allowed && error_retryable && attempt + 1 < retry_policy.max_attempts
if !can_retry {
return __connector_http_error_envelope(nil, error)
}
if !__connector_http_sleep_before_retry(retry_policy, attempt, nil) {
return __connector_http_error_envelope(nil, error)
}
attempt = attempt + 1
continue
}
let response = unwrap(result)
if response.ok ?? false {
return __connector_http_success_envelope(response)
}
let should_retry = __connector_http_should_retry_response(response, retry_policy, retry_allowed)
let retry_after_ms = connector_http_rate_limit(response)?.retry_after_ms
let error = __connector_http_error_from_response(response, opts, should_retry)
if !should_retry || attempt + 1 >= retry_policy.max_attempts {
return __connector_http_error_envelope(response, error)
}
if !__connector_http_sleep_before_retry(retry_policy, attempt, retry_after_ms) {
return __connector_http_error_envelope(response, error)
}
attempt = attempt + 1
}
return __connector_http_error_envelope(
nil,
{
category: "transient_network",
retryable: true,
message: "connector_http_request exhausted without a response",
provider: opts?.provider,
operation: opts?.operation,
},
)
}
/**
* connector_http_json.
*
* @effects: [net, time]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: connector_http_json("GET", url, {retry: {max_attempts: 3}})
*/
pub fn connector_http_json(method, url, options = nil) {
let response = connector_http_request(method, url, options)
if !(response.ok ?? false) {
return response
}
if trim(to_string(response?.body ?? "")) == "" {
return response + {json: nil}
}
let parsed = try {
json_parse(response.body)
}
if is_ok(parsed) {
return response + {json: unwrap(parsed)}
}
let opts = options ?? {}
let error = filter_nil(
{
category: "invalid_json",
status: response.status,
retryable: false,
body: if opts?.redact_error_body ?? false {
nil
} else {
response.body
},
provider: opts?.provider,
operation: opts?.operation,
},
)
return __connector_http_error_envelope(response, error)
}
/**
* oauth2_token_refresh.
*
* @effects: [net]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: oauth2_token_refresh(client_id, client_secret, refresh_token, token_url, options)
*/
pub fn oauth2_token_refresh(client_id, client_secret, refresh_token, token_url, options = nil) {
let opts = options ?? {}
let params = filter_nil(
merge(
{
grant_type: "refresh_token",
client_id: client_id,
client_secret: client_secret,
refresh_token: refresh_token,
},
opts.extra_params ?? {},
),
)
let headers = merge(
{Accept: "application/json", "Content-Type": "application/x-www-form-urlencoded"},
opts.headers ?? {},
)
let response = harness.net.post(token_url, __form_body(params), {headers: headers})
let parsed = safe_parse(response.body)
if !(response.ok ?? false) {
return {ok: false, status: response.status, error: parsed ?? response.body}
}
return merge(parsed ?? {body: response.body}, {ok: true, status: response.status})
}
/**
* rate_limit_token_bucket.
*
* @effects: [time]
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: rate_limit_token_bucket(state, config, now_ms)
*/
pub fn rate_limit_token_bucket(state = nil, config = nil, now_ms = nil) {
let cfg = config ?? {}
var capacity = cfg.capacity ?? 60
if capacity < 1 {
capacity = 1
}
var refill_tokens = cfg.refill_tokens ?? 1
if refill_tokens <= 0 {
refill_tokens = 1
}
var refill_interval_ms = cfg.refill_interval_ms ?? 1000
if refill_interval_ms < 1 {
refill_interval_ms = 1
}
let now = now_ms ?? to_int(harness.clock.timestamp() * 1000)
let last_refill_ms = state?.last_refill_ms ?? now
let elapsed_ms = now - last_refill_ms
var tokens = state?.tokens ?? capacity
if elapsed_ms > 0 {
tokens = tokens + elapsed_ms / refill_interval_ms * refill_tokens
if tokens > capacity {
tokens = capacity
}
}
var allowed = false
var retry_after_ms = 0
if tokens >= 1 {
tokens = tokens - 1
allowed = true
} else {
retry_after_ms = to_int((1 - tokens) * refill_interval_ms / refill_tokens)
if retry_after_ms < 1 {
retry_after_ms = 1
}
}
return {allowed: allowed, retry_after_ms: retry_after_ms, state: {tokens: tokens, last_refill_ms: now}}
}
fn __page_items(page, items_path) {
if items_path != nil {
return json_pointer(page, items_path) ?? []
}
return page.results ?? page.items ?? page.data ?? []
}
/**
* paginate_cursor.
*
* @effects: []
* @allocation: heap
* @errors: []
* @api_stability: stable
* @example: paginate_cursor(initial_url, fetch_fn, cursor_path, options)
*/
pub fn paginate_cursor(initial_url, fetch_fn, cursor_path, options = nil) {
let opts = options ?? {}
let max_pages = opts.max_pages ?? 100
let items_path = opts.items_path
var cursor = opts.initial_cursor
var pages = []
var items = []
var page_count = 0
while page_count < max_pages {
let page = fetch_fn(initial_url, cursor)
pages = pages + [page]
for item in __page_items(page, items_path) {
items = items + [item]
}
page_count = page_count + 1
cursor = json_pointer(page, cursor_path)
if cursor == nil || cursor == "" {
return {complete: true, pages: pages, items: items, next_cursor: nil}
}
}
return {complete: false, pages: pages, items: items, next_cursor: cursor}
}