opensymphony 1.1.1

A Rust implementation of the OpenAI Symphony orchestration design
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
# WebSocket Runtime Contract

This document describes the WebSocket-first integration between OpenSymphony and OpenHands agent-server.

It is intentionally detailed because the runtime stream is the highest-risk adapter surface in the project.

## 1. Scope

This document covers:

- where the WebSocket fits into the runtime lifecycle
- which parts of the OpenHands contract are REST vs WebSocket
- how to attach, recover, and reconcile
- how to decode and store events
- how to translate runtime stream state into Symphony worker outcomes

It does not replace the Symphony poll loop. Linear polling remains a separate orchestrator concern.

## 2. Current contract to implement

## 2.1 Directionality

For the SDK agent-server contract used here:

- REST performs operations
- WebSocket delivers real-time events and state updates

Do not design the MVP around sending agent actions over the WebSocket.

## 2.2 Path shape

Use the SDK client behavior as the pinned wire-level reference.

Current observed path shape:

```text
ws://<host>/sockets/events/{conversation_id}
wss://<host>/sockets/events/{conversation_id}
```

If the host contains a base path, preserve it.

## 2.3 Auth modes

Support these workflow-controlled modes in the Rust client:

- `auto`
  - no auth when `openhands.transport.session_api_key_env` is unset
  - otherwise use the pinned 1.14.0 shape: HTTP header `x-session-api-key`
    plus WebSocket query param `session_api_key`
- `header`
  - keep the same HTTP header and send `X-Session-API-Key` on the WebSocket
    handshake for non-browser clients
- `query_param`
  - keep the same HTTP header and send the configured query parameter on the
    WebSocket handshake

Default local MVP behavior: `auto`, which behaves as none when no session API
key env is configured.

## 3. Event model

## 3.1 High-value typed events

Typed decoding should exist at least for:

- `ConversationStateUpdateEvent`
- `LLMCompletionLogEvent`
- `ConversationErrorEvent` if present in the pinned version
- minimal generic base event envelope with:
  - `id`
  - `timestamp`
  - `source`
  - `kind` or discriminant
  - raw JSON payload

## 3.2 Unknown events

Unknown events must not crash the runtime.

Rules:

- preserve raw JSON
- assign a generic `UnknownEvent`
- store it in the event journal
- ignore it for state transitions unless a later version adds typed handling

## 3.3 State updates

`ConversationStateUpdateEvent` is the most important event type for this adapter.

Use it for:

- readiness barrier
- incremental conversation state mirror
- `execution_status` change tracking
- live snapshot updates without extra REST calls

## 4. WebSocket-first attachment algorithm

Use this sequence whenever attaching to a conversation, both fresh and existing.

### 4.1 Attachment sequence

1. Ensure `conversation_id` is known.
2. Perform an initial full event sync with `GET /api/conversations/{id}/events/search`.
3. Start the WebSocket connection with the configured handshake timeout.
4. Wait for the readiness barrier.
5. Reconcile events again with `GET /events/search`.
6. Only after readiness and reconcile is the stream considered attached.

Attach replay rule:

- queue the initial `/events/search` snapshot into the same ordered pending buffer used by later reconcile inserts
- `next_event()` replays that persisted snapshot for resumed conversations after attach completes
- the WebSocket readiness frame still stays on `ready_event` as the attach barrier unless `/events/search` independently contains the same event ID

### 4.2 Why the double sync exists

There is a race window between:

- initial REST event sync
- successful WebSocket subscription

Without reconciliation after readiness, events emitted in that window can be missed.

This is why the Rust runtime should intentionally copy the high-level behavior of the SDK remote client rather than simplifying it away.

## 5. Readiness barrier

The current SDK client treats the first `ConversationStateUpdateEvent` received after subscription as proof that the subscription is ready. On the pinned `v1.14.0` server this is a full-state snapshot with `key == "full_state"`.

Implement the same rule.

Do not require that the first WebSocket frame be the readiness event.

The client should:

- keep waiting across ping and pong traffic
- ignore unrelated event kinds until a `ConversationStateUpdateEvent` arrives
- treat the envelope kind itself as the readiness signal even if a forward-compatible payload cannot be typed yet
- ignore one malformed or forward-compatible frame and continue waiting until timeout or socket close

Suggested API in Rust:

```rust
enum StreamReady {
    Ready,
    Timeout,
    Closed,
}
```

Configurable timeout:

- default `30000 ms`

If readiness is not achieved, fail the attach attempt and surface a transport error.
Apply the same timeout budget to the WebSocket handshake itself so a blackholed socket does not
wait on kernel TCP timeouts.

Current repository implementation:

- `opensymphony::opensymphony_openhands::OpenHandsClient::wait_for_readiness` loops until an event envelope with kind `ConversationStateUpdateEvent` arrives from `/sockets/events/{conversation_id}`, while tolerating control frames and unrelated or undecodable frames before readiness
- `opensymphony::opensymphony_openhands::OpenHandsClient::attach_runtime_stream` performs the full attach sequence: initial REST sync, WebSocket connect, readiness barrier, and post-ready reconcile before returning a live `RuntimeEventStream`
- `TransportConfig` preserves base-path prefixes and applies REST versus WebSocket auth independently, so reverse-proxied external or authenticated targets keep their `/runtime/...` style prefixes while still matching the pinned auth contract; managed unauthenticated loopback targets normalize those prefixes back to the direct origin before local supervisor startup and client reuse
- the readiness frame is retained on `RuntimeEventStream::ready_event` as an attach barrier and diagnostic snapshot, refreshes the in-memory state mirror only when it is newer than the reconciled state, stays authoritative across later mirror rebuilds until reconcile or REST refresh surfaces an equal or newer decodable state update, can salvage a forward-compatible `state_delta` even when the full payload shape no longer deserializes cleanly, can clear stale terminal REST fallback when a reused conversation has already restarted into an active `queued` or `running` state, and is not replayed through `next_event()` unless `/events/search` independently contains the same event ID
- `RuntimeEventStream::next_event` now drains any immediately available live socket frames into the same ordered pending queue before yielding a later attach-backlog item, so direct consumers still observe timestamp order while replaying persisted history without relying on a fixed read-ahead delay
- workflow-owned `openhands.websocket.ready_timeout_ms`, `reconnect_initial_ms`, and `reconnect_max_ms` overrides are now consumed by the runtime attach/reconnect path; explicit `openhands.websocket.enabled` still remains rejected because the live runtime always opens the readiness socket
- the internal `opensymphony_testkit` module sends a state-update event immediately on default WebSocket attach and can now override individual `/events/search` calls plus per-connection WebSocket frame sequences, so attach/reconcile race windows stay deterministic in CI without standing up ad hoc inline servers for each scenario
- `tests/fake_server_contract.rs`, `tests/client_resilience.rs`, `tests/live_pinned_server.rs`, and `tests/doctor.rs` cover the readiness, attach, auth, and reconcile path, with the shared fake-server contract suite now absorbing the scripted initial-replay, buffered-live ordering, reconnect-exhaustion, and explicit-close cases

## 6. Event cache and reconciliation

## 6.1 Required behavior

The local event cache must:

- hold events in timestamp order
- deduplicate by event ID
- support full sync and incremental add
- allow replay into snapshot derivation
- survive reconnect cycles during one worker lifetime

## 6.2 Ordering

Do not assume WebSocket delivery order is strictly monotonic.

The cache should insert events by timestamp, not just append blindly.

## 6.3 Reconciliation API

Use `GET /api/conversations/{conversation_id}/events/search` with pagination.

The reconcile pass should:

- fetch all pages until no `next_page_id`
- merge only unseen event IDs
- update ordering
- return the number of new events added
- tolerate partial failure by preserving already-cached events

Current repository implementation:

- `OpenHandsClient::search_all_events` paginates until `next_page_id` is absent
- `EventCache` deduplicates by event ID, inserts by timestamp order, and can return the newly merged events from reconcile or reconnect passes
- `RuntimeEventStream` preserves one cache across reconnect cycles and replays ordered state updates into the state mirror after late arrivals
- the contract suite includes multi-page reconciliation, out-of-order insertion, and reconnect-recovery tests

## 6.4 Conversation state mirror

Maintain a conversation state mirror alongside the event cache.

Sources of truth:

- WebSocket `ConversationStateUpdateEvent` for fast incremental state
- `GET /api/conversations/{id}` for authoritative refresh on startup and reconnect

Wire-level compatibility note:

- the pinned source may emit both full-state snapshots and single-key state updates
- the Rust client should support both without depending on undocumented fields leaking into orchestrator code
- per-field ordering should compare parsed RFC3339 timestamps, not raw strings, so equivalent
  offset spellings such as `Z` and `+01:00` do not rewind newer state

Current repository implementation:

- `KnownEvent` now distinguishes `ConversationStateUpdateEvent`, `LLMCompletionLogEvent`, `ConversationErrorEvent`, and `UnknownEvent`
- unknown event kinds retain raw JSON in the event journal instead of failing the stream
- `ConversationStateMirror::rebuild_from` replays the timestamp-ordered cache so late state updates do not regress terminal detection
- `RuntimeEventStream` reapplies the non-replayable readiness snapshot after attach/reconnect and after later cache-driven mirror rebuilds only when reconcile and REST refresh do not already carry an equal or newer decodable state update, still derives a minimal mirror patch from forward-compatible `state_delta` payloads even when typed state decoding would otherwise fall back, and lets an active `queued` or `running` ready barrier clear stale terminal REST fallback for restarted reused conversations
- `ConversationStateMirror::terminal_status` provides the current finished/error/stuck classification used by the probe and future workers

## 6.5 Scheduler-facing event handoff

The orchestrator does not need the full OpenHands wire contract at its boundary.

The worker backend that bridges `RuntimeEventStream` into the scheduler should surface:

- launch-time `ConversationMetadata`
- ordered runtime-event updates carrying `event_id`, `event_kind`, `summary`, and `observed_at`
- a terminal worker outcome once the stream resolves to `finished`, `error`, `stuck`, or another final condition

When a reused conversation is already `queued` or `running` at attach time, emit
that launch-time metadata as soon as attach succeeds instead of waiting for the
previous turn to finish. This lets the scheduler bind the worker to the live
conversation before continuation-retry waiting completes.

This keeps WebSocket protocol churn isolated inside the internal `opensymphony_openhands` module while still giving the orchestrator enough information for stall detection, reconciliation, and snapshot derivation.

## 7. Run lifecycle over REST plus WebSocket

## 7.1 Sending a turn

For each turn:

1. Select or create the conversation for this worker lifetime according to the resolved reuse policy before choosing the prompt shape.
2. Select prompt shape:
   - full rendered workflow prompt on a fresh conversation, including every `fresh_each_run` worker lifetime
   - full rendered workflow prompt again if a reused conversation exists locally but has never been seeded with that first assignment message
   - built-in continuation guidance on resumed seeded conversations or later turns
   - persist the selected prompt under `.opensymphony/prompts/last-*-prompt.(md|json)` and archive the per-run capture under `.opensymphony/runs/attempt-####/`
3. If the reused conversation is already `queued` or `running`, wait for that
   turn to reach a terminal state before sending the next prompt.
4. `POST /api/conversations/{id}/events`
   - user role
   - prompt content
   - `run=false`
5. `POST /api/conversations/{id}/run`
   - if the server returns `409 Conflict`, wait for the active turn to finish,
     reconcile the attached backlog, then retry `POST /run` on the same conversation
6. Observe progress through the WebSocket event stream

## 7.2 Waiting for completion

Primary mechanism:

- watch `ConversationStateUpdateEvent`
- detect `execution_status` entering a terminal state

Fallback mechanism:

- refresh `GET /api/conversations/{id}` if stream health is uncertain
- reconcile events after refresh, but treat that final reconcile as best-effort once the
  authoritative REST snapshot has already confirmed a terminal state
- if the stream reaches the stall timeout without a terminal state, run one final
  `GET /events/search` reconcile before classifying the turn as stalled so
  persisted-but-missed terminal events can still be recovered
- classify the worker if the authoritative state is terminal

## 7.3 Terminal state queue

Implementation recommendation:

- maintain a small internal channel that receives terminal execution-status transitions
- let the worker await this channel with timeout and cancellation support
- keep REST fallback as backup, not as the main loop
- do not report success while a queued `ConversationErrorEvent` still exists in the pending stream buffer, even if the mirrored state has already reached `finished`
- before accepting `finished`, give the socket one extra scheduler turn and buffered-drain pass so a failure frame that arrives immediately after the terminal state update is still observed

## 8. Disconnect and reconnect behavior

## 8.1 Failure modes to handle

- server restart
- network drop
- idle timeout
- daemon reconnect after a transient failure
- temporary auth mismatch
- event decode failure for one message

## 8.2 Reconnect policy

Use bounded exponential backoff:

- initial delay: `1000 ms`
- max delay: `30000 ms`

On reconnect:

1. refresh conversation info with REST
2. reconnect WebSocket with the configured handshake timeout
3. wait for readiness
4. reconcile `events/search` again before trusting the resumed stream
4. reconcile events
5. resume streaming

Buffered-delivery rule:

- if a read yields replayable events and then the socket closes or resets during the same read-ahead window, queue and yield those events first
- only attempt reconnect after the already-queued events have been drained
- if reconnect later exhausts policy limits, surface that failure on the following poll instead of dropping buffered runtime activity

If reconnection exhausts policy limits or the worker deadline, fail the worker and let the orchestrator schedule retry.
If shutdown or cancellation arrives during a reconnect handshake, abort that attempt immediately
instead of waiting for the transport timeout to expire.
Apply the same stop-aware behavior after the socket is accepted but before the first readiness event
arrives, so reconnect shutdown does not wait for `ready_timeout_ms` to expire.

Current repository implementation:

- `RuntimeStreamConfig` carries readiness timeout, bounded exponential backoff, and max reconnect attempts
- `RuntimeEventStream::next_event` reconnects on both clean socket close and transport resets, then re-runs readiness plus reconcile before resuming
- reconnect-required reads now defer reconnect long enough to flush any already-queued events before surfacing exhaustion
- reconnect readiness snapshots remain barriers only; they refresh `ready_event` but are not replayed as synthetic runtime events unless `/events/search` also returns them
- `wait_for_probe_terminal_state` now prefers an already-refreshed terminal `state_mirror()` over surfacing `ReconnectExhausted`, so the doctor/live probe path can complete from authoritative REST state when a run is already terminal but WebSocket reattach fails afterward
- `wait_for_probe_terminal_state` also gives the stream one extra scheduler-turn buffered drain before accepting `finished`, so a just-arrived `ConversationErrorEvent` still fails the probe instead of being skipped after the terminal state update
- after `wait_for_probe_terminal_state` has already succeeded, `run_probe` reuses the stream-backed conversation snapshot plus mirrored terminal `execution_status` instead of requiring one more terminal REST fetch
- `RuntimeEventStream::close` clears any queued replay and deferred reconnect intent before closing the socket, so later polls on that stream instance stay closed instead of reopening the conversation
- the internal `opensymphony_testkit` module can now force live socket drops and script per-connection frame sequences so reconnect and buffered-delivery coverage stay deterministic in CI

## 8.3 Decode failures

A single malformed or unknown event must not kill the stream unless the connection itself is corrupted.

Policy:

- log decode failure with raw payload hash or truncated payload
- keep connection alive if possible
- continue processing subsequent messages

## 9. Cancellation

Worker cancellation must close the WebSocket cleanly and stop waiting on terminal status.

Cancellation sources:

- orchestrator reconciliation
- daemon shutdown
- terminal issue state
- operator stop command in future control-plane versions

## 10. Suggested Rust internal API

## 10.1 Types

Suggested modules and types in `opensymphony_openhands`:

- `WsUrlBuilder`
- `WsAuthMode`
- `EventEnvelope`
- `KnownEvent`
- `UnknownEvent`
- `EventCache`
- `ConversationStateMirror`
- `RuntimeEventStream`
- `RunWatcher`
- `TerminalStatus`

## 10.2 Trait sketch

```rust
trait RuntimeEventStream {
    async fn attach(&mut self) -> Result<()>;
    async fn wait_ready(&mut self, timeout: Duration) -> Result<()>;
    async fn next_event(&mut self) -> Result<Option<RuntimeEvent>>;
    async fn reconcile(&mut self) -> Result<usize>;
    async fn close(&mut self) -> Result<()>;
}
```

Current repository implementation:

- `OpenHandsClient::attach_runtime_stream(conversation_id, RuntimeStreamConfig)` returns a live `RuntimeEventStream`
- `RuntimeEventStream` currently exposes `ready_event`, `event_cache`, `state_mirror`, `next_event`, and `close`
- `OpenHandsClient::wait_for_readiness` remains available as the narrow readiness helper used by lower-level tests and diagnostics

## 11. Relationship to Symphony worker state

The runtime stream informs, but does not define, Symphony outcomes.

Mapping examples:

- terminal `execution_status` `finished` with clean completion:
  - worker may continue another in-process turn or exit normally
- transport failure:
  - abnormal worker exit, schedule backoff retry
- no events for longer than `stall_timeout_ms`:
  - run one final REST event reconcile
  - if no recovered terminal state appears, classify as stalled, terminate worker, schedule retry
- issue becomes terminal in Linear while stream is healthy:
  - orchestrator cancels worker regardless of OpenHands status

## 12. What not to do

- Do not use a polling-only runtime adapter and plan to refactor later.
- Do not use the OpenHands web-app Socket.IO examples for this stream.
- Do not assume the WebSocket alone makes the runtime state authoritative.
- Do not discard unknown events.
- Do not skip the post-ready reconcile step.
- Do not let TUI-specific streaming requirements leak into the runtime contract.

## 13. Test matrix for this component

Required automated scenarios:

- fresh attach with immediate readiness
- attach timeout
- out-of-order WebSocket events
- duplicate event IDs across REST and WebSocket
- disconnect before terminal status
- reconnect plus reconcile catches missed events
- terminal `execution_status` observed over WebSocket
- failure-only events such as `ConversationErrorEvent` do not count as successful completion
- REST fallback after stream uncertainty
- unknown event kind does not crash the stream

Live test scenarios:

- real local agent-server attach
- send prompt, trigger run, receive progress
- clean completion
- forced server restart and reconnect behavior