harn-stdlib 0.8.163

Embedded Harn standard library source catalog
Documentation
/**
 * @harn-entrypoint-category agent.stdlib
 *
 * std/agent/stream — small streaming helpers for app-facing chat surfaces.
 * Harn owns transcript and stream semantics; hosts/apps own terminal rendering.
 */
fn __stream_dict(value) {
  if type_of(value) == "dict" {
    return value
  }
  return {}
}

fn __stream_string(value) -> string {
  if value == nil {
    return ""
  }
  return to_string(value)
}

fn __stream_non_empty(value, fallback) -> string {
  let text = trim(__stream_string(value))
  if text == "" {
    return fallback
  }
  return text
}

fn __stream_is_callable(value) -> bool {
  let kind = type_of(value)
  return kind == "function" || kind == "closure" || kind == "fn"
}

fn __stream_has_key(d, key) -> bool {
  return type_of(d) == "dict" && contains(d.keys(), key)
}

fn __stream_drop_keys(d, keys) {
  var out = __stream_dict(d)
  for key in keys {
    if __stream_has_key(out, key) {
      out = out.remove(key)
    }
  }
  return out
}

fn __stream_private_config(config) {
  let cfg = __stream_dict(config)
  let nested = __stream_dict(cfg?.private)
  let merged = cfg + nested
  return {
    open_tag: __stream_non_empty(merged?.open_tag, "<secret>"),
    close_tag: __stream_non_empty(merged?.close_tag, "</secret>"),
  }
}

fn __stream_tail(text: string, keep: int) -> string {
  if keep <= 0 || len(text) <= keep {
    return text
  }
  return text[len(text) - keep:len(text)]
}

fn __stream_open_prefix_suffix_len(text: string, open_tag: string) -> int {
  let limit = min(len(text), max(len(open_tag) - 1, 0))
  var size = limit
  while size > 0 {
    let suffix = text[len(text) - size:len(text)]
    if starts_with(open_tag, suffix) {
      return size
    }
    size = size - 1
  }
  return 0
}

/**
 * Strip private tagged spans from a complete text value.
 *
 * Defaults to `<secret>...</secret>` and truncates an unterminated private block
 * from the opening tag onward.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_private_text_filter(text, config = nil) -> string {
  let cfg = __stream_private_config(config)
  let open_tag = cfg.open_tag
  let close_tag = cfg.close_tag
  var rest = __stream_string(text)
  var out = ""
  for _ in range(1000) {
    let start = rest.index_of(open_tag)
    if start < 0 {
      return out + rest
    }
    out = out + rest[0:start]
    let after_open = rest[start + len(open_tag):len(rest)]
    let end = after_open.index_of(close_tag)
    if end < 0 {
      return out
    }
    rest = after_open[end + len(close_tag):len(after_open)]
  }
  return out + rest
}

/**
 * Create private-span streaming filter state.
 *
 * Use with `agent_private_stream_delta(...)` and
 * `agent_private_stream_finish(...)` when app code needs to render live deltas
 * without leaking an opening private tag split across provider chunks.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_private_stream_state(config = nil) {
  let cfg = __stream_private_config(config)
  return {
    config: cfg,
    pending: "",
    private_open: false,
    raw_text: "",
    visible_text: "",
    chunks: 0,
    withheld_suffix: false,
    unterminated_private: false,
  }
}

/**
 * Fold one provider delta through a private-span filter.
 *
 * Returns `{state, visible_delta, raw_delta, private_open}`. The filter holds
 * back enough suffix text to recognize an opening private tag even when it is
 * split over multiple chunks.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_private_stream_delta(state = nil, delta = "", config = nil) {
  let existing = __stream_dict(state)
  let cfg = __stream_private_config(config ?? existing?.config)
  let open_tag = cfg.open_tag
  let close_tag = cfg.close_tag
  let close_hold = max(len(close_tag) - 1, 0)
  let raw_delta = __stream_string(delta)
  var pending = __stream_string(existing?.pending) + raw_delta
  var private_open = existing?.private_open ?? false
  var visible_delta = ""
  var loop_count = 0
  while loop_count < 1000 && len(pending) > 0 {
    if private_open {
      let close_at = pending.index_of(close_tag)
      if close_at < 0 {
        pending = __stream_tail(pending, close_hold)
        break
      }
      pending = pending[close_at + len(close_tag):len(pending)]
      private_open = false
    } else {
      let open_at = pending.index_of(open_tag)
      if open_at >= 0 {
        visible_delta = visible_delta + pending[0:open_at]
        pending = pending[open_at + len(open_tag):len(pending)]
        private_open = true
      } else {
        let keep = __stream_open_prefix_suffix_len(pending, open_tag)
        let emit_len = max(len(pending) - keep, 0)
        if emit_len > 0 {
          visible_delta = visible_delta + pending[0:emit_len]
          pending = pending[emit_len:len(pending)]
        }
        break
      }
    }
    loop_count = loop_count + 1
  }
  let next = existing
    + {
    config: cfg,
    pending: pending,
    private_open: private_open,
    raw_text: __stream_string(existing?.raw_text) + raw_delta,
    visible_text: __stream_string(existing?.visible_text) + visible_delta,
    chunks: to_int(existing?.chunks ?? 0) + 1,
    withheld_suffix: len(pending) > 0,
    unterminated_private: private_open,
  }
  return {state: next, visible_delta: visible_delta, raw_delta: raw_delta, private_open: private_open}
}

/**
 * Finalize private-span stream state after the provider stream ends.
 *
 * Any suffix that is not an opening-tag prefix is emitted. Unterminated private
 * blocks stay hidden and are reported with `unterminated_private: true`.
 *
 * @effects: []
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_private_stream_finish(state = nil, config = nil) {
  let existing = __stream_dict(state)
  let cfg = __stream_private_config(config ?? existing?.config)
  let open_tag = cfg.open_tag
  var pending = __stream_string(existing?.pending)
  var final_delta = ""
  let private_open = existing?.private_open ?? false
  if !private_open && pending != "" && !starts_with(open_tag, pending) {
    final_delta = pending
    pending = ""
  }
  let visible = __stream_string(existing?.visible_text) + final_delta
  let next = existing
    + {
    config: cfg,
    pending: pending,
    visible_text: visible,
    withheld_suffix: len(pending) > 0,
    unterminated_private: private_open,
  }
  return {
    state: next,
    ok: true,
    text: __stream_string(existing?.raw_text),
    visible_text: visible,
    visible_delta: final_delta,
    private_open: private_open,
    withheld_suffix: len(pending) > 0,
    unterminated_private: private_open,
  }
}

fn __stream_llm_options(options) {
  return __stream_drop_keys(
    options,
    ["private", "open_tag", "close_tag", "on_delta", "on_chunk", "on_error", "on_done"],
  )
}

/**
 * Stream one LLM call through a private-span filter and terminal envelope.
 *
 * Optional callbacks:
 *   - `on_delta(visible_delta, event, state)`
 *   - `on_chunk(provider_chunk, event, state)`
 *   - `on_done(result)`
 *   - `on_error(result)`
 *
 * Returns `{ok, status, text, visible_text, chunks, finish_reason}` on success
 * and `{ok: false, status: "stream_interrupt", ...}` if stream consumption or
 * a callback throws. This keeps chat apps from hanging without a terminal event.
 *
 * @effects: [llm.call]
 * @errors: []
 * @api_stability: experimental
 */
pub fn agent_stream_call(prompt, system = nil, options = nil) {
  let cfg = __stream_dict(options)
  let llm_options = __stream_llm_options(cfg)
  var state = agent_private_stream_state(cfg)
  var finish_reason = nil
  let consumed = try {
    for chunk in llm_stream_call(prompt, system, llm_options) {
      let delta = chunk?.delta ?? chunk?.visible_delta ?? ""
      let event = agent_private_stream_delta(state, delta, cfg)
      state = event.state
      if event.visible_delta != "" && __stream_is_callable(cfg?.on_delta) {
        cfg.on_delta(event.visible_delta, event, state)
      }
      if __stream_is_callable(cfg?.on_chunk) {
        cfg.on_chunk(chunk, event, state)
      }
      if chunk?.finish_reason != nil {
        finish_reason = chunk.finish_reason
      }
    }
    true
  }
  if is_err(consumed) {
    var result = {
      ok: false,
      status: "stream_interrupt",
      error: unwrap_err(consumed),
      text: state.raw_text,
      visible_text: state.visible_text,
      chunks: state.chunks,
      finish_reason: finish_reason,
      state: state,
    }
    if __stream_is_callable(cfg?.on_error) {
      let notified = try {
        cfg.on_error(result)
        true
      }
      if is_err(notified) {
        result = result + {callback_error: unwrap_err(notified)}
      }
    }
    return result
  }
  let finished = agent_private_stream_finish(state, cfg)
  state = finished.state
  if finished.visible_delta != "" && __stream_is_callable(cfg?.on_delta) {
    let event = {state: state, visible_delta: finished.visible_delta, raw_delta: "", private_open: false}
    let notified = try {
      cfg.on_delta(finished.visible_delta, event, state)
      true
    }
    if is_err(notified) {
      var result = {
        ok: false,
        status: "stream_interrupt",
        error: unwrap_err(notified),
        text: state.raw_text,
        visible_text: state.visible_text,
        chunks: state.chunks,
        finish_reason: finish_reason,
        state: state,
      }
      if __stream_is_callable(cfg?.on_error) {
        let error_notified = try {
          cfg.on_error(result)
          true
        }
        if is_err(error_notified) {
          result = result + {callback_error: unwrap_err(error_notified)}
        }
      }
      return result
    }
  }
  var result = {
    ok: true,
    status: "done",
    text: finished.text,
    visible_text: finished.visible_text,
    chunks: state.chunks,
    finish_reason: finish_reason,
    private_open: finished.private_open,
    withheld_suffix: finished.withheld_suffix,
    unterminated_private: finished.unterminated_private,
    state: state,
  }
  if __stream_is_callable(cfg?.on_done) {
    let notified = try {
      cfg.on_done(result)
      true
    }
    if is_err(notified) {
      result = result + {ok: false, status: "stream_interrupt", error: unwrap_err(notified)}
      if __stream_is_callable(cfg?.on_error) {
        let error_notified = try {
          cfg.on_error(result)
          true
        }
        if is_err(error_notified) {
          result = result + {callback_error: unwrap_err(error_notified)}
        }
      }
    }
  }
  return result
}