/**
* @harn-entrypoint-category agent.stdlib
*
* std/agent/stream — small streaming helpers for app-facing chat surfaces.
* Harn owns transcript and stream semantics; hosts/apps own terminal rendering.
*/
fn __stream_dict(value) {
if type_of(value) == "dict" {
return value
}
return {}
}
fn __stream_string(value) -> string {
if value == nil {
return ""
}
return to_string(value)
}
fn __stream_non_empty(value, fallback) -> string {
let text = trim(__stream_string(value))
if text == "" {
return fallback
}
return text
}
fn __stream_is_callable(value) -> bool {
let kind = type_of(value)
return kind == "function" || kind == "closure" || kind == "fn"
}
fn __stream_has_key(d, key) -> bool {
return type_of(d) == "dict" && contains(d.keys(), key)
}
fn __stream_drop_keys(d, keys) {
var out = __stream_dict(d)
for key in keys {
if __stream_has_key(out, key) {
out = out.remove(key)
}
}
return out
}
fn __stream_private_config(config) {
let cfg = __stream_dict(config)
let nested = __stream_dict(cfg?.private)
let merged = cfg + nested
return {
open_tag: __stream_non_empty(merged?.open_tag, "<secret>"),
close_tag: __stream_non_empty(merged?.close_tag, "</secret>"),
}
}
fn __stream_tail(text: string, keep: int) -> string {
if keep <= 0 || len(text) <= keep {
return text
}
return text[len(text) - keep:len(text)]
}
fn __stream_open_prefix_suffix_len(text: string, open_tag: string) -> int {
let limit = min(len(text), max(len(open_tag) - 1, 0))
var size = limit
while size > 0 {
let suffix = text[len(text) - size:len(text)]
if starts_with(open_tag, suffix) {
return size
}
size = size - 1
}
return 0
}
/**
* Strip private tagged spans from a complete text value.
*
* Defaults to `<secret>...</secret>` and truncates an unterminated private block
* from the opening tag onward.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn agent_private_text_filter(text, config = nil) -> string {
let cfg = __stream_private_config(config)
let open_tag = cfg.open_tag
let close_tag = cfg.close_tag
var rest = __stream_string(text)
var out = ""
for _ in range(1000) {
let start = rest.index_of(open_tag)
if start < 0 {
return out + rest
}
out = out + rest[0:start]
let after_open = rest[start + len(open_tag):len(rest)]
let end = after_open.index_of(close_tag)
if end < 0 {
return out
}
rest = after_open[end + len(close_tag):len(after_open)]
}
return out + rest
}
/**
* Create private-span streaming filter state.
*
* Use with `agent_private_stream_delta(...)` and
* `agent_private_stream_finish(...)` when app code needs to render live deltas
* without leaking an opening private tag split across provider chunks.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn agent_private_stream_state(config = nil) {
let cfg = __stream_private_config(config)
return {
config: cfg,
pending: "",
private_open: false,
raw_text: "",
visible_text: "",
chunks: 0,
withheld_suffix: false,
unterminated_private: false,
}
}
/**
* Fold one provider delta through a private-span filter.
*
* Returns `{state, visible_delta, raw_delta, private_open}`. The filter holds
* back enough suffix text to recognize an opening private tag even when it is
* split over multiple chunks.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn agent_private_stream_delta(state = nil, delta = "", config = nil) {
let existing = __stream_dict(state)
let cfg = __stream_private_config(config ?? existing?.config)
let open_tag = cfg.open_tag
let close_tag = cfg.close_tag
let close_hold = max(len(close_tag) - 1, 0)
let raw_delta = __stream_string(delta)
var pending = __stream_string(existing?.pending) + raw_delta
var private_open = existing?.private_open ?? false
var visible_delta = ""
var loop_count = 0
while loop_count < 1000 && len(pending) > 0 {
if private_open {
let close_at = pending.index_of(close_tag)
if close_at < 0 {
pending = __stream_tail(pending, close_hold)
break
}
pending = pending[close_at + len(close_tag):len(pending)]
private_open = false
} else {
let open_at = pending.index_of(open_tag)
if open_at >= 0 {
visible_delta = visible_delta + pending[0:open_at]
pending = pending[open_at + len(open_tag):len(pending)]
private_open = true
} else {
let keep = __stream_open_prefix_suffix_len(pending, open_tag)
let emit_len = max(len(pending) - keep, 0)
if emit_len > 0 {
visible_delta = visible_delta + pending[0:emit_len]
pending = pending[emit_len:len(pending)]
}
break
}
}
loop_count = loop_count + 1
}
let next = existing
+ {
config: cfg,
pending: pending,
private_open: private_open,
raw_text: __stream_string(existing?.raw_text) + raw_delta,
visible_text: __stream_string(existing?.visible_text) + visible_delta,
chunks: to_int(existing?.chunks ?? 0) + 1,
withheld_suffix: len(pending) > 0,
unterminated_private: private_open,
}
return {state: next, visible_delta: visible_delta, raw_delta: raw_delta, private_open: private_open}
}
/**
* Finalize private-span stream state after the provider stream ends.
*
* Any suffix that is not an opening-tag prefix is emitted. Unterminated private
* blocks stay hidden and are reported with `unterminated_private: true`.
*
* @effects: []
* @errors: []
* @api_stability: experimental
*/
pub fn agent_private_stream_finish(state = nil, config = nil) {
let existing = __stream_dict(state)
let cfg = __stream_private_config(config ?? existing?.config)
let open_tag = cfg.open_tag
var pending = __stream_string(existing?.pending)
var final_delta = ""
let private_open = existing?.private_open ?? false
if !private_open && pending != "" && !starts_with(open_tag, pending) {
final_delta = pending
pending = ""
}
let visible = __stream_string(existing?.visible_text) + final_delta
let next = existing
+ {
config: cfg,
pending: pending,
visible_text: visible,
withheld_suffix: len(pending) > 0,
unterminated_private: private_open,
}
return {
state: next,
ok: true,
text: __stream_string(existing?.raw_text),
visible_text: visible,
visible_delta: final_delta,
private_open: private_open,
withheld_suffix: len(pending) > 0,
unterminated_private: private_open,
}
}
fn __stream_llm_options(options) {
return __stream_drop_keys(
options,
["private", "open_tag", "close_tag", "on_delta", "on_chunk", "on_error", "on_done"],
)
}
/**
* Stream one LLM call through a private-span filter and terminal envelope.
*
* Optional callbacks:
* - `on_delta(visible_delta, event, state)`
* - `on_chunk(provider_chunk, event, state)`
* - `on_done(result)`
* - `on_error(result)`
*
* Returns `{ok, status, text, visible_text, chunks, finish_reason}` on success
* and `{ok: false, status: "stream_interrupt", ...}` if stream consumption or
* a callback throws. This keeps chat apps from hanging without a terminal event.
*
* @effects: [llm.call]
* @errors: []
* @api_stability: experimental
*/
pub fn agent_stream_call(prompt, system = nil, options = nil) {
let cfg = __stream_dict(options)
let llm_options = __stream_llm_options(cfg)
var state = agent_private_stream_state(cfg)
var finish_reason = nil
let consumed = try {
for chunk in llm_stream_call(prompt, system, llm_options) {
let delta = chunk?.delta ?? chunk?.visible_delta ?? ""
let event = agent_private_stream_delta(state, delta, cfg)
state = event.state
if event.visible_delta != "" && __stream_is_callable(cfg?.on_delta) {
cfg.on_delta(event.visible_delta, event, state)
}
if __stream_is_callable(cfg?.on_chunk) {
cfg.on_chunk(chunk, event, state)
}
if chunk?.finish_reason != nil {
finish_reason = chunk.finish_reason
}
}
true
}
if is_err(consumed) {
var result = {
ok: false,
status: "stream_interrupt",
error: unwrap_err(consumed),
text: state.raw_text,
visible_text: state.visible_text,
chunks: state.chunks,
finish_reason: finish_reason,
state: state,
}
if __stream_is_callable(cfg?.on_error) {
let notified = try {
cfg.on_error(result)
true
}
if is_err(notified) {
result = result + {callback_error: unwrap_err(notified)}
}
}
return result
}
let finished = agent_private_stream_finish(state, cfg)
state = finished.state
if finished.visible_delta != "" && __stream_is_callable(cfg?.on_delta) {
let event = {state: state, visible_delta: finished.visible_delta, raw_delta: "", private_open: false}
let notified = try {
cfg.on_delta(finished.visible_delta, event, state)
true
}
if is_err(notified) {
var result = {
ok: false,
status: "stream_interrupt",
error: unwrap_err(notified),
text: state.raw_text,
visible_text: state.visible_text,
chunks: state.chunks,
finish_reason: finish_reason,
state: state,
}
if __stream_is_callable(cfg?.on_error) {
let error_notified = try {
cfg.on_error(result)
true
}
if is_err(error_notified) {
result = result + {callback_error: unwrap_err(error_notified)}
}
}
return result
}
}
var result = {
ok: true,
status: "done",
text: finished.text,
visible_text: finished.visible_text,
chunks: state.chunks,
finish_reason: finish_reason,
private_open: finished.private_open,
withheld_suffix: finished.withheld_suffix,
unterminated_private: finished.unterminated_private,
state: state,
}
if __stream_is_callable(cfg?.on_done) {
let notified = try {
cfg.on_done(result)
true
}
if is_err(notified) {
result = result + {ok: false, status: "stream_interrupt", error: unwrap_err(notified)}
if __stream_is_callable(cfg?.on_error) {
let error_notified = try {
cfg.on_error(result)
true
}
if is_err(error_notified) {
result = result + {callback_error: unwrap_err(error_notified)}
}
}
}
}
return result
}