limitless/ws/stream.rs
1use crate::prelude::*;
2use crate::ws::client::WsClient;
3use crate::ws::PING_INTERVAL;
4
5use futures::{SinkExt, StreamExt};
6use log::{debug, error, trace, warn};
7use std::time::Instant;
8
9use tokio::sync::mpsc;
10use tokio_tungstenite::tungstenite::Message as WsMessage;
11
12// ── Socket.IO / Engine.IO protocol constants ─────────────────────────────
13
14/// The Socket.IO namespace used by the Limitless Exchange.
15pub const SOCKET_NAMESPACE: &str = "/markets";
16
17/// Engine.IO open packet prefix (server → client handshake).
18const EIO_OPEN: u8 = b'0';
19
20/// Engine.IO close packet.
21const EIO_CLOSE: u8 = b'1';
22
23/// Engine.IO ping packet (server → client).
24const EIO_PING: u8 = b'2';
25
26/// Engine.IO pong packet (client → server).
27const EIO_PONG: u8 = b'3';
28
29/// Engine.IO message packet (carries Socket.IO payload).
30const EIO_MESSAGE: u8 = b'4';
31
32/// Socket.IO connect packet type.
33/// Socket.IO disconnect packet type.
34/// Socket.IO event packet type.
35const SIO_EVENT: u8 = b'2';
36
37/// Socket.IO error packet type.
38// ── Helpers ──────────────────────────────────────────────────────────────
39
40/// Build a Socket.IO event frame for emission to the server.
41///
42/// Format: `42{namespace},["{event}",{data}]`
43///
44/// Returns a complete frame ready to send as a WebSocket text message.
45pub fn frame_socketio_event(event: &str, data: &Value) -> String {
46 let payload = serde_json::to_string(&serde_json::json!([event, data]))
47 .unwrap_or_else(|_| format!(r#"["{}",null]"#, event));
48 format!("42{namespace},{payload}", namespace = SOCKET_NAMESPACE)
49}
50
51/// Build a Socket.IO namespace connect frame.
52///
53/// Format: `40{namespace},`
54pub fn frame_socketio_connect() -> String {
55 format!("40{namespace},", namespace = SOCKET_NAMESPACE)
56}
57
58/// Parse an Engine.IO text message and extract the Socket.IO event name
59/// and payload if this is an event (`42{namespace},[...]`).
60///
61/// Returns `Some((event_name, payload_value))` for events, `None` for
62/// non-event messages (open, close, ping/pong, connect ack, etc.).
63pub fn parse_socketio_message(text: &str) -> Option<(String, Value)> {
64 let bytes = text.as_bytes();
65
66 // Must start with '4' (Engine.IO message)
67 if bytes.is_empty() || bytes[0] != EIO_MESSAGE {
68 return None;
69 }
70
71 let after_eio = &text[1..];
72
73 // Must start with a digit (Socket.IO packet type)
74 let first_sio = after_eio.as_bytes().first()?;
75 if *first_sio != SIO_EVENT {
76 // '40' = connect, '41' = disconnect, '43' = ack, '44' = error
77 // None of these carry user events
78 return None;
79 }
80
81 // Skip the Socket.IO type digit: now we have "2{namespace},[...]"
82 let after_sio_type = &after_eio[1..];
83
84 // Strip namespace prefix: "{namespace},"
85 let event_payload = if let Some(rest) = after_sio_type.strip_prefix(SOCKET_NAMESPACE) {
86 // Strip the comma after namespace
87 rest.strip_prefix(',')?
88 } else {
89 // No namespace prefix — payload starts right after the type digit
90 // (e.g., "2[...]")
91 after_sio_type
92 };
93
94 // Parse the JSON array: ["eventName", {...}]
95 let values: Vec<Value> = serde_json::from_str(event_payload).ok()?;
96 if values.is_empty() {
97 return None;
98 }
99 let event_name = values[0].as_str()?.to_string();
100 let payload = values.get(1).cloned().unwrap_or(Value::Null);
101
102 Some((event_name, payload))
103}
104
105/// Check if a text message is an Engine.IO ping (just the character '2').
106fn is_eio_ping(text: &str) -> bool {
107 text.as_bytes() == [EIO_PING]
108}
109
110/// Check if a text message is an Engine.IO close (just the character '1').
111fn is_eio_close(text: &str) -> bool {
112 text.as_bytes() == [EIO_CLOSE]
113}
114
115/// Check if a text message is the Engine.IO open packet (starts with '0').
116fn is_eio_open(text: &str) -> bool {
117 text.as_bytes().first() == Some(&EIO_OPEN)
118}
119
120/// Check if a text message is a Socket.IO namespace connect ack ('40{namespace},').
121fn is_namespace_connect_ack(text: &str) -> bool {
122 text.starts_with(&format!("40{namespace},", namespace = SOCKET_NAMESPACE))
123}
124
125/// Check if a text message is a Socket.IO namespace disconnect.
126fn is_namespace_disconnect(text: &str) -> bool {
127 text.starts_with(&format!("41{namespace}", namespace = SOCKET_NAMESPACE))
128}
129
130// ── Stream ───────────────────────────────────────────────────────────────
131
132/// Manages WebSocket streaming connections to the Limitless Exchange.
133///
134/// Connects to `wss://ws.limitless.exchange/socket.io/?EIO=4&transport=websocket`
135/// and handles the Socket.IO protocol (Engine.IO v4 + Socket.IO) over the
136/// raw WebSocket transport.
137///
138/// # Event Reference
139///
140/// | Client → Server (emit) | Auth | Description |
141/// |-------------------------------|------|------------------------------------|
142/// | `subscribe_market_prices` | No | AMM prices + CLOB orderbook |
143/// | `subscribe_positions` | Yes | Portfolio position updates |
144/// | `subscribe_order_events` | Yes | OME + settlement lifecycle |
145/// | `subscribe_market_lifecycle` | No | Market creation / resolution |
146///
147/// | Server → Client (on) | Auth | Description |
148/// |-----------------------|------|--------------------------------------|
149/// | `newPriceData` | No | AMM price update |
150/// | `orderbookUpdate` | No | CLOB orderbook snapshot |
151/// | `positions` | Yes | Position balance change |
152/// | `orderEvent` | Yes | OME state or settlement result |
153/// | `marketCreated` | No | New market funded and visible |
154/// | `marketResolved` | No | Market resolved with winning outcome |
155/// | `system` | — | System notifications |
156/// | `authenticated` | Yes | Auth confirmation |
157/// | `exception` | — | Error notifications |
158#[derive(Clone)]
159pub struct Stream {
160 pub client: Client,
161}
162
163impl Stream {
164 /// Tests connectivity by performing the full Socket.IO handshake.
165 ///
166 /// Connects, reads the Engine.IO open packet, sends namespace connect,
167 /// and verifies the server acknowledges it.
168 pub async fn ws_ping(&self) -> Result<(), LimitlessError> {
169 let stream = self.client.wss_connect(None, false, None).await?;
170 let mut ws_client = WsClient::new(stream);
171
172 // ── Phase 1: Engine.IO open ──────────────────────────────
173 let open_text = Self::read_text_message(ws_client.stream()).await?;
174 if !is_eio_open(&open_text) {
175 return Err(LimitlessError::Base(format!(
176 "Expected Engine.IO open packet (0{{...}}), got: {}",
177 &open_text[..open_text.len().min(80)]
178 )));
179 }
180 trace!("Engine.IO open received: {}", open_text);
181
182 // ── Phase 2: Socket.IO namespace connect ─────────────────
183 let connect_frame = frame_socketio_connect();
184 ws_client
185 .stream()
186 .send(WsMessage::Text(connect_frame.into()))
187 .await
188 .map_err(|e| {
189 LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
190 })?;
191
192 let ack_text = Self::read_text_message(ws_client.stream()).await?;
193 if !is_namespace_connect_ack(&ack_text) {
194 return Err(LimitlessError::Base(format!(
195 "Expected namespace connect ack (40/markets,), got: {}",
196 &ack_text[..ack_text.len().min(80)]
197 )));
198 }
199 trace!("Namespace connected: {}", ack_text);
200
201 // ── Send proper close ────────────────────────────────────
202 let _ = ws_client
203 .stream()
204 .send(
205 WsMessage::Text(format!("41{namespace},", namespace = SOCKET_NAMESPACE).into())
206 .into(),
207 )
208 .await;
209 let _ = ws_client.disconnect().await;
210
211 Ok(())
212 }
213
214 /// Subscribe to a public data stream with an event handler callback.
215 ///
216 /// The `handler` receives a `Value` that is an array `[event_name, payload]`
217 /// for Socket.IO events, or the raw parsed JSON for other messages.
218 ///
219 /// # Example
220 ///
221 /// ```no_run
222 /// use limitless::prelude::*;
223 ///
224 /// #[tokio::main]
225 /// async fn main() {
226 /// let stream: Stream = Limitless::new(None, None);
227 /// stream
228 /// .ws_subscribe(|event| {
229 /// println!("Received: {:?}", event);
230 /// Ok(())
231 /// })
232 /// .await
233 /// .unwrap();
234 /// }
235 /// ```
236 pub async fn ws_subscribe<F>(&self, handler: F) -> Result<(), LimitlessError>
237 where
238 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
239 {
240 let stream = self.client.wss_connect(None, false, None).await?;
241 let mut ws_client = WsClient::new(stream);
242 Self::event_loop(&mut ws_client, handler, None).await?;
243 Ok(())
244 }
245
246 /// Subscribe to a stream with dynamic command support.
247 ///
248 /// Allows emitting subscription commands (subscribe/unsubscribe) after
249 /// the connection is established. Commands should be complete Socket.IO
250 /// frames (e.g., `42/markets,["subscribe_market_prices",{...}]`).
251 ///
252 /// Use [`frame_socketio_event`] to build properly framed commands.
253 pub async fn ws_subscribe_with_commands<F>(
254 &self,
255 cmd_receiver: mpsc::UnboundedReceiver<String>,
256 handler: F,
257 ) -> Result<(), LimitlessError>
258 where
259 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
260 {
261 let stream = self.client.wss_connect(None, false, None).await?;
262 let mut ws_client = WsClient::new(stream);
263 Self::event_loop(&mut ws_client, handler, Some(cmd_receiver)).await?;
264 Ok(())
265 }
266
267 /// Subscribe to market updates for a specific slug.
268 ///
269 /// Handles the full lifecycle: connect, handshake, subscribe, and
270 /// event dispatch. The handler receives `[event_name, payload]` arrays.
271 ///
272 /// # Example
273 ///
274 /// ```no_run
275 /// use limitless::prelude::*;
276 ///
277 /// #[tokio::main]
278 /// async fn main() {
279 /// let ws: Stream = Limitless::new(None, None);
280 /// ws.ws_subscribe_market("btc-above-100k", |event_name, payload| {
281 /// println!("{event_name}: {payload}");
282 /// Ok(())
283 /// }).await.unwrap();
284 /// }
285 /// ```
286 pub async fn ws_subscribe_market<F>(
287 &self,
288 market_slug: &str,
289 mut handler: F,
290 ) -> Result<(), LimitlessError>
291 where
292 F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
293 {
294 let stream = self.client.wss_connect(None, false, None).await?;
295 let mut ws_client = WsClient::new(stream);
296
297 // ── Handshake ─────────────────────────────────────────────
298 Self::perform_handshake(&mut ws_client).await?;
299
300 // ── Subscribe ─────────────────────────────────────────────
301 let sub = frame_socketio_event(
302 "subscribe_market_prices",
303 &serde_json::json!({"marketSlugs": [market_slug]}),
304 );
305 ws_client
306 .stream()
307 .send(WsMessage::Text(sub.into()))
308 .await
309 .map_err(|e| LimitlessError::Base(format!("Failed to send subscription: {}", e)))?;
310 debug!("Subscribed to market prices for: {}", market_slug);
311
312 // ── Event loop with typed dispatch ────────────────────────
313 Self::typed_event_loop(&mut ws_client, &mut handler, None).await?;
314 Ok(())
315 }
316
317 /// Subscribe to the WebSocket event stream and receive typed [`WsEventKind`] events.
318 ///
319 /// Connects, performs the Socket.IO handshake, then enters an event
320 /// loop that parses every incoming server event through
321 /// [`deserialize_event`] before passing the resulting [`WsEventKind`]
322 /// to `handler`.
323 ///
324 /// # Example
325 ///
326 /// ```no_run
327 /// use limitless::prelude::*;
328 ///
329 /// #[tokio::main]
330 /// async fn main() {
331 /// let ws: Stream = Limitless::new(None, None);
332 /// ws.ws_subscribe_events(|event| {
333 /// match event {
334 /// WsEventKind::NewPriceData(p) => println!("AMM prices for {}", p.market_address),
335 /// WsEventKind::TradeEvent(t) => println!("Trade: {} @ {}", t.size, t.price),
336 /// WsEventKind::Unknown(payload) => println!("Unknown: {payload:?}"),
337 /// other => println!("Event: {other:?}"),
338 /// }
339 /// Ok(())
340 /// }).await.unwrap();
341 /// }
342 /// ```
343 pub async fn ws_subscribe_events<F>(&self, mut handler: F) -> Result<(), LimitlessError>
344 where
345 F: FnMut(WsEventKind) -> Result<(), LimitlessError> + 'static + Send,
346 {
347 let stream = self.client.wss_connect(None, false, None).await?;
348 let mut ws_client = WsClient::new(stream);
349
350 // ── Handshake ─────────────────────────────────────────────
351 Self::perform_handshake(&mut ws_client).await?;
352
353 // ── Typed dispatch wrapper ────────────────────────────────
354 let mut adapter = move |event_name: &str, payload: &Value| -> Result<(), LimitlessError> {
355 match deserialize_event(event_name, payload) {
356 Some(kind) => handler(kind),
357 None => {
358 debug!("Failed to deserialize event '{}', skipping", event_name);
359 Ok(())
360 }
361 }
362 };
363
364 // ── Event loop with typed dispatch ────────────────────────
365 Self::typed_event_loop(&mut ws_client, &mut adapter, None).await?;
366 Ok(())
367 }
368
369 /// Perform the Socket.IO handshake: read Engine.IO open, send namespace
370 /// connect, wait for ack.
371 async fn perform_handshake(ws_client: &mut WsClient) -> Result<(), LimitlessError> {
372 // Read Engine.IO open packet
373 let open_text = Self::read_text_message(ws_client.stream()).await?;
374 if !is_eio_open(&open_text) {
375 return Err(LimitlessError::Base(format!(
376 "Expected Engine.IO open packet (0{{...}}), got: {}",
377 &open_text[..open_text.len().min(120)]
378 )));
379 }
380 debug!("Engine.IO open: {}", open_text);
381
382 // Send namespace connect
383 let connect_frame = frame_socketio_connect();
384 ws_client
385 .stream()
386 .send(WsMessage::Text(connect_frame.into()))
387 .await
388 .map_err(|e| {
389 LimitlessError::Base(format!("Failed to send namespace connect: {}", e))
390 })?;
391
392 // Read namespace connect ack
393 let ack_text = Self::read_text_message(ws_client.stream()).await?;
394 if !is_namespace_connect_ack(&ack_text) {
395 return Err(LimitlessError::Base(format!(
396 "Expected namespace connect ack (40/markets,), got: {}",
397 &ack_text[..ack_text.len().min(120)]
398 )));
399 }
400 debug!("Namespace connected: {}", ack_text);
401
402 Ok(())
403 }
404
405 /// Read the next text (or binary → text) message from the stream.
406 async fn read_text_message(
407 stream: &mut WebSocketStream<MaybeTlsStream<TcpStream>>,
408 ) -> Result<String, LimitlessError> {
409 match stream.next().await {
410 Some(Ok(WsMessage::Text(text))) => Ok(text.to_string()),
411 Some(Ok(WsMessage::Binary(data))) => String::from_utf8(data.to_vec())
412 .map_err(|e| LimitlessError::Base(format!("Invalid UTF-8 in binary frame: {}", e))),
413 Some(Ok(other)) => Err(LimitlessError::Base(format!(
414 "Expected text frame, got: {:?}",
415 other
416 ))),
417 Some(Err(e)) => Err(LimitlessError::Tungstenite(e)),
418 None => Err(LimitlessError::Base(
419 "WebSocket connection closed during handshake".to_string(),
420 )),
421 }
422 }
423
424 /// Core event loop: reads WebSocket messages, dispatches to handler,
425 /// sends periodic pings, and processes outgoing subscription commands.
426 ///
427 /// Performs the Socket.IO handshake before entering the main loop.
428 pub(crate) async fn event_loop<F>(
429 ws_client: &mut WsClient,
430 mut handler: F,
431 mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
432 ) -> Result<(), LimitlessError>
433 where
434 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
435 {
436 // ── Handshake phase ────────────────────────────────────────────
437 Self::perform_handshake(ws_client).await?;
438
439 // ── Main event loop ────────────────────────────────────────────
440 let mut last_ping = Instant::now();
441
442 loop {
443 tokio::select! {
444 // ── Incoming WebSocket message ─────────────────────────
445 msg = ws_client.stream().next() => {
446 match msg {
447 Some(Ok(WsMessage::Text(text))) => {
448 Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
449 }
450 Some(Ok(WsMessage::Binary(data))) => {
451 if let Ok(text) = String::from_utf8(data.to_vec()) {
452 Self::handle_incoming_text(&text, &mut handler, ws_client).await?;
453 }
454 }
455 Some(Ok(WsMessage::Ping(data))) => {
456 let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
457 }
458 Some(Ok(WsMessage::Close(_))) => {
459 trace!("WebSocket closed by server");
460 return Ok(());
461 }
462 Some(Err(e)) => {
463 return Err(LimitlessError::Tungstenite(e));
464 }
465 None => {
466 trace!("WebSocket stream ended");
467 return Ok(());
468 }
469 _ => {}
470 }
471 }
472
473 // ── Outgoing command ───────────────────────────────────
474 cmd = async {
475 match cmd_receiver.as_mut() {
476 Some(rx) => rx.recv().await,
477 None => std::future::pending().await,
478 }
479 } => {
480 if let Some(cmd) = cmd {
481 debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
482 if let Err(e) = ws_client
483 .stream()
484 .send(WsMessage::Text(cmd.into()))
485 .await
486 {
487 error!("Failed to send command: {}", e);
488 }
489 }
490 }
491
492 // ── Periodic Engine.IO ping ────────────────────────────
493 _ = tokio::time::sleep(PING_INTERVAL) => {
494 let now = Instant::now();
495 if now.duration_since(last_ping) >= PING_INTERVAL {
496 // Send Engine.IO ping (the string "2")
497 let _ = ws_client
498 .stream()
499 .send(WsMessage::Text(String::from("2").into()))
500 .await;
501 last_ping = now;
502 }
503 }
504 }
505 }
506 }
507
508 /// Typed event loop: like `event_loop` but calls a `FnMut(&str, &Value)`
509 /// handler instead of `FnMut(Value)`.
510 pub(crate) async fn typed_event_loop<F>(
511 ws_client: &mut WsClient,
512 handler: &mut F,
513 mut cmd_receiver: Option<mpsc::UnboundedReceiver<String>>,
514 ) -> Result<(), LimitlessError>
515 where
516 F: FnMut(&str, &Value) -> Result<(), LimitlessError> + 'static + Send,
517 {
518 let mut last_ping = Instant::now();
519
520 loop {
521 tokio::select! {
522 // ── Incoming WebSocket message ─────────────────────────
523 msg = ws_client.stream().next() => {
524 match msg {
525 Some(Ok(WsMessage::Text(text))) => {
526 if let Some((event_name, payload)) = parse_socketio_message(&text) {
527 if let Err(e) = handler(&event_name, &payload) {
528 error!("WS handler error on '{event_name}': {}", e);
529 }
530 } else if is_eio_ping(&text) {
531 let _ = ws_client.stream()
532 .send(WsMessage::Text(String::from("3").into()))
533 .await;
534 } else if is_eio_close(&text) || is_namespace_disconnect(&text) {
535 trace!("Socket.IO close/disconnect received");
536 return Ok(());
537 }
538 // Ignore other messages (open, connect ack, pong, etc.)
539 }
540 Some(Ok(WsMessage::Binary(data))) => {
541 if let Ok(text) = String::from_utf8(data.to_vec()) {
542 if let Some((event_name, payload)) = parse_socketio_message(&text) {
543 if let Err(e) = handler(&event_name, &payload) {
544 error!("WS handler error on '{event_name}': {}", e);
545 }
546 }
547 }
548 }
549 Some(Ok(WsMessage::Ping(data))) => {
550 let _ = ws_client.stream().send(WsMessage::Pong(data)).await;
551 }
552 Some(Ok(WsMessage::Close(_))) => {
553 trace!("WebSocket closed by server");
554 return Ok(());
555 }
556 Some(Err(e)) => {
557 return Err(LimitlessError::Tungstenite(e));
558 }
559 None => {
560 trace!("WebSocket stream ended");
561 return Ok(());
562 }
563 _ => {}
564 }
565 }
566
567 // ── Outgoing command ───────────────────────────────────
568 cmd = async {
569 match cmd_receiver.as_mut() {
570 Some(rx) => rx.recv().await,
571 None => std::future::pending().await,
572 }
573 } => {
574 if let Some(cmd) = cmd {
575 debug!("WS send: {}", &cmd[..cmd.len().min(200)]);
576 if let Err(e) = ws_client
577 .stream()
578 .send(WsMessage::Text(cmd.into()))
579 .await
580 {
581 error!("Failed to send command: {}", e);
582 }
583 }
584 }
585
586 // ── Periodic Engine.IO ping ────────────────────────────
587 _ = tokio::time::sleep(PING_INTERVAL) => {
588 let now = Instant::now();
589 if now.duration_since(last_ping) >= PING_INTERVAL {
590 let _ = ws_client
591 .stream()
592 .send(WsMessage::Text(String::from("2").into()))
593 .await;
594 last_ping = now;
595 }
596 }
597 }
598 }
599 }
600
601 /// Handle an incoming text message from the WebSocket.
602 ///
603 /// Dispatches Engine.IO control frames and Socket.IO events.
604 async fn handle_incoming_text<F>(
605 text: &str,
606 handler: &mut F,
607 ws_client: &mut WsClient,
608 ) -> Result<(), LimitlessError>
609 where
610 F: FnMut(Value) -> Result<(), LimitlessError> + 'static + Send,
611 {
612 // Engine.IO ping → reply pong
613 if is_eio_ping(text) {
614 let _ = ws_client
615 .stream()
616 .send(WsMessage::Text(String::from("3").into()))
617 .await;
618 return Ok(());
619 }
620
621 // Engine.IO close or Socket.IO namespace disconnect
622 if is_eio_close(text) || is_namespace_disconnect(text) {
623 trace!("Socket.IO close/disconnect");
624 return Err(LimitlessError::Base(
625 "Server closed the Socket.IO connection".to_string(),
626 ));
627 }
628
629 // Engine.IO pong — ignore
630 if text.as_bytes() == [EIO_PONG] {
631 return Ok(());
632 }
633
634 // Try to parse as a Socket.IO event (42/markets,[...])
635 if let Some((event_name, payload)) = parse_socketio_message(text) {
636 // Pass as [event_name, payload] array for backward compat
637 let event_array = serde_json::json!([event_name, payload]);
638 if let Err(e) = handler(event_array) {
639 error!("WS handler error on '{event_name}': {}", e);
640 }
641 return Ok(());
642 }
643
644 // Socket.IO connect ack, plain open, etc. — ignore
645 if is_namespace_connect_ack(text) || is_eio_open(text) {
646 return Ok(());
647 }
648
649 // Unknown message — log and try to parse as raw JSON
650 warn!("Unhandled WS message: {}", &text[..text.len().min(200)]);
651 if let Ok(value) = serde_json::from_str::<Value>(text) {
652 let _ = handler(value);
653 }
654
655 Ok(())
656 }
657}
658
659impl Limitless for Stream {
660 fn new(api_key: Option<String>, secret: Option<String>) -> Self {
661 Self::new_with_config(&Config::default(), api_key, secret)
662 }
663
664 fn new_with_config(config: &Config, api_key: Option<String>, secret: Option<String>) -> Self {
665 Self {
666 client: Client::new(api_key, secret, config.rest_api_endpoint.to_string()),
667 }
668 }
669}