tsoracle-server 0.1.2

Embeddable gRPC server for the timestamp oracle.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
//
//  ░▀█▀░█▀▀░█▀█░█▀▄░█▀█░█▀▀░█░░░█▀▀
//  ░░█░░▀▀█░█░█░█▀▄░█▀█░█░░░█░░░█▀▀
//  ░░▀░░▀▀▀░▀▀▀░▀░▀░▀░▀░▀▀▀░▀▀▀░▀▀▀
//
//  tsoracle — Distributed Timestamp Oracle
//
//  Copyright (c) 2026 Prisma Risk
//  Licensed under the Apache License, Version 2.0
//  https://github.com/prisma-risk/tsoracle
//

use core::time::Duration;
use parking_lot::Mutex;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;
use tonic::service::Routes;
use tonic::transport::Server as TonicServer;
use tsoracle_consensus::ConsensusDriver;
use tsoracle_core::{Allocator, Clock, SystemClock};
#[cfg(any(test, feature = "test-fakes"))]
use tsoracle_core::{CoreError, WindowGrant};
use tsoracle_proto::v1::tso_service_server::TsoServiceServer;

use crate::service::TsoServiceImpl;

#[derive(Debug, thiserror::Error)]
pub enum BuildError {
    #[error("consensus_driver is required")]
    MissingConsensusDriver,
    /// Surfaced when [`crate::leader_hint::KEY`] fails [`crate::leader_hint::validate_key`].
    /// Today the key is a valid `const &'static str`, so this variant is
    /// developer-error insurance: a future edit that breaks the key triggers
    /// a startup failure rather than silently stripping the trailer from
    /// every NOT_LEADER response.
    #[error("invalid leader-hint metadata key: {0}")]
    InvalidLeaderHintKey(#[from] tonic::metadata::errors::InvalidMetadataKey),
}

#[derive(Debug, thiserror::Error)]
pub enum ServerError {
    #[error("transport: {0}")]
    Transport(#[from] tonic::transport::Error),
    #[error("consensus: {0}")]
    Consensus(#[from] tsoracle_consensus::ConsensusError),
    #[error("core: {0}")]
    Core(#[from] tsoracle_core::CoreError),
    /// The leader-watch task panicked. Distinct from a clean error return so
    /// operators can tell "driver returned Err" (recoverable design) from
    /// "task panicked" (programming bug).
    #[error("leader-watch task panicked: {payload}")]
    WatchPanic { payload: String },
}

#[derive(Clone, Debug)]
pub enum ServingState {
    NotServing { leader_endpoint: Option<String> },
    Serving,
}

pub struct ServerBuilder {
    consensus: Option<Arc<dyn ConsensusDriver>>,
    clock: Option<Arc<dyn Clock>>,
    window_ahead: Duration,
    failover_advance: Duration,
    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
    tls_config: Option<tonic::transport::ServerTlsConfig>,
}

impl Default for ServerBuilder {
    fn default() -> Self {
        ServerBuilder {
            consensus: None,
            clock: None,
            window_ahead: Duration::from_secs(3),
            failover_advance: Duration::from_secs(1),
            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
            tls_config: None,
        }
    }
}

impl ServerBuilder {
    pub fn consensus_driver(mut self, driver: Arc<dyn ConsensusDriver>) -> Self {
        self.consensus = Some(driver);
        self
    }
    pub fn clock(mut self, clock: Arc<dyn Clock>) -> Self {
        self.clock = Some(clock);
        self
    }
    pub fn window_ahead(mut self, window_ahead: Duration) -> Self {
        self.window_ahead = window_ahead;
        self
    }
    pub fn failover_advance(mut self, failover_advance: Duration) -> Self {
        self.failover_advance = failover_advance;
        self
    }

    /// Configure TLS termination for this server. Applied inside
    /// [`Server::serve`], [`Server::serve_with_shutdown`], and
    /// [`Server::serve_with_listener`]. Not applied to [`Server::into_router`] —
    /// embedders mounting tsoracle alongside their own services control TLS
    /// on their own tonic builder.
    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
    pub fn tls_config(mut self, cfg: tonic::transport::ServerTlsConfig) -> Self {
        self.tls_config = Some(cfg);
        self
    }

    pub fn build(self) -> Result<Server, BuildError> {
        crate::leader_hint::validate_key()?;
        let consensus = self.consensus.ok_or(BuildError::MissingConsensusDriver)?;
        let clock = self.clock.unwrap_or_else(|| Arc::new(SystemClock));
        let (state_tx, state_rx) = watch::channel(ServingState::NotServing {
            leader_endpoint: None,
        });
        Ok(Server {
            consensus,
            clock,
            window_ahead: self.window_ahead,
            failover_advance: self.failover_advance,
            allocator: Arc::new(Mutex::new(Allocator::new())),
            state_tx,
            state_rx,
            extension_lock: Arc::new(tokio::sync::Mutex::new(())),
            extension_gate: Arc::new(tokio::sync::RwLock::new(())),
            #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
            tls_config: self.tls_config,
        })
    }
}

pub struct Server {
    pub(crate) consensus: Arc<dyn ConsensusDriver>,
    pub(crate) clock: Arc<dyn Clock>,
    pub(crate) window_ahead: Duration,
    pub(crate) failover_advance: Duration,
    pub(crate) allocator: Arc<Mutex<Allocator>>,
    pub(crate) state_tx: watch::Sender<ServingState>,
    pub state_rx: watch::Receiver<ServingState>,
    /// Serializes window extensions so a stampeding burst of `WindowExhausted`
    /// requests resolves to a single `persist_high_water` round-trip. Acquired
    /// before `extension_gate`; combined with a recheck-after-acquire inside
    /// `extend_window`, latecomers find the window already extended and
    /// return without contacting consensus.
    pub(crate) extension_lock: Arc<tokio::sync::Mutex<()>>,
    /// Read-locked by window-extension calls for the duration of their
    /// prepare → persist → commit dance. The leader-watch task takes a
    /// write lock between clearing serving and calling load_high_water,
    /// which drains all in-flight extensions started under the prior epoch
    /// before the fence proceeds.
    pub(crate) extension_gate: Arc<tokio::sync::RwLock<()>>,
    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
    pub(crate) tls_config: Option<tonic::transport::ServerTlsConfig>,
}

impl Server {
    pub fn builder() -> ServerBuilder {
        ServerBuilder::default()
    }

    /// Single transition API used in response to evidence that the current
    /// epoch is no longer valid: consensus rejection (NotLeader/Fenced
    /// during persist), leader-watch task termination, or other authoritative
    /// signals of leadership loss.
    ///
    /// Clears the allocator BEFORE publishing `NotServing` so a racing
    /// `try_grant` either (a) observes `CoreError::NotLeader` rather than
    /// handing out a stale-epoch grant, or (b) succeeds against a still-valid
    /// epoch and the *next* call observes `NotServing`. Either ordering is
    /// safe; what is not safe is the inverse (publish first, clear later)
    /// because a request between those two steps would see `Serving` AND a
    /// still-leader allocator.
    ///
    /// Does NOT take `extension_gate.write()`. That would deadlock against
    /// in-flight extensions currently holding the read lock and awaiting
    /// `persist_high_water`. Those in-flights will either complete cleanly
    /// (the next `try_grant` then sees `NotServing`) or fail with
    /// NotLeader/Fenced (and reach this helper themselves — it is idempotent).
    pub(crate) fn step_down_due_to_consensus_rejection(&self, leader_endpoint: Option<String>) {
        self.allocator.lock().on_leadership_lost();
        let _ = self
            .state_tx
            .send(ServingState::NotServing { leader_endpoint });
    }
}

impl Server {
    /// Return the configured `TsoServiceServer<TsoServiceImpl>` as a tonic
    /// `Routes` value plus a `JoinHandle` for the spawned leader-watch task,
    /// so callers can mount tsoracle's service alongside their own services
    /// on a shared tonic listener instead of binding a dedicated port.
    ///
    /// The `JoinHandle` payload is `Result<(), ServerError>` so embedders
    /// can observe leader-watch termination. Before returning an error, the
    /// task publishes `ServingState::NotServing { leader_endpoint: None }`
    /// so all subsequent RPCs fail fast with `FAILED_PRECONDITION` — even
    /// embedders who never inspect the handle get fail-safe behavior.
    ///
    /// The `Server::serve()` method is a thin wrapper over this — it calls
    /// `into_router`, builds a tonic `Server`, and binds a listener.
    pub fn into_router(self) -> (Routes, tokio::task::JoinHandle<Result<(), ServerError>>) {
        let server = Arc::new(self);

        let watch_server = server.clone();
        let watch_handle = tokio::spawn(async move {
            use futures::FutureExt;
            // catch_unwind so a panic in run_leader_watch still routes through
            // the poisoning path. Without this, embedders who mount into_router
            // directly and never observe the JoinHandle would see
            // ServingState::Serving remain published while the watch task is
            // dead — the inverse of the fail-safe guarantee documented above.
            // The panic is re-raised after poisoning so serve / serve_with_*
            // continue to translate it into ServerError::WatchPanic via
            // join_to_server_result.
            let outcome =
                std::panic::AssertUnwindSafe(crate::fence::run_leader_watch(watch_server.clone()))
                    .catch_unwind()
                    .await;
            match outcome {
                Ok(result) => {
                    if let Err(ref _e) = result {
                        // Poison BEFORE returning so embedders who do not observe
                        // the JoinHandle still get fail-safe behavior.
                        watch_server.step_down_due_to_consensus_rejection(None);
                        #[cfg(feature = "tracing")]
                        tracing::error!(error = %_e, "leader-watch terminated; serving disabled");
                    }
                    result
                }
                Err(panic_payload) => {
                    // Mirror the Err branch: poison BEFORE re-raising so
                    // handle-dropping embedders still observe NotServing.
                    watch_server.step_down_due_to_consensus_rejection(None);
                    #[cfg(feature = "tracing")]
                    tracing::error!("leader-watch panicked; serving disabled");
                    std::panic::resume_unwind(panic_payload);
                }
            }
        });

        let service = TsoServiceImpl { server };
        #[allow(unused_mut)]
        let mut routes = Routes::new(TsoServiceServer::new(service));
        #[cfg(feature = "reflection")]
        {
            #[expect(
                clippy::expect_used,
                reason = "`FILE_DESCRIPTOR_SET` is generated by `tsoracle-proto`'s `build.rs` from checked-in `.proto` sources; if it ever fails to decode, the build itself is broken. Tracked by #9."
            )]
            let reflection = tonic_reflection::server::Builder::configure()
                .register_encoded_file_descriptor_set(tsoracle_proto::FILE_DESCRIPTOR_SET)
                .build_v1()
                .expect("FILE_DESCRIPTOR_SET emitted by build.rs is always valid");
            routes = routes.add_service(reflection);
        }
        (routes, watch_handle)
    }

    pub async fn serve(self, addr: SocketAddr) -> Result<(), ServerError> {
        self.serve_with_shutdown(addr, futures::future::pending())
            .await
    }

    /// Run the gRPC server until either the caller's `shutdown` fires or the
    /// leader-watch task terminates.
    ///
    /// Three outcomes:
    /// 1. `shutdown` fires first → tonic drains in-flights and returns Ok.
    ///    The watch handle is aborted; any error it had been about to return
    ///    is forfeited (the process is shutting down anyway).
    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
    ///    `cancel_tx` triggers tonic's graceful shutdown; in-flight `GetTs`
    ///    calls whose `try_grant` already succeeded complete with the
    ///    timestamps they were allocated; new calls fail fast. Returns `Err(e)`.
    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
    ///    with the panic payload stringified. Same drain semantics as (2).
    pub async fn serve_with_shutdown(
        self,
        addr: SocketAddr,
        shutdown: impl Future<Output = ()> + Send + 'static,
    ) -> Result<(), ServerError> {
        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
        let tls_config = self.tls_config.clone();

        let (routes, mut watch_handle) = self.into_router();
        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();

        // tonic stops when EITHER the user's shutdown fires OR we cancel
        // because the watch task terminated.
        let combined_shutdown = async move {
            tokio::select! {
                _ = shutdown => {}
                _ = cancel_rx => {}
            }
        };

        let mut tonic = TonicServer::builder();
        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
        if let Some(cfg) = tls_config {
            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
        }
        let serve = tonic
            .add_routes(routes)
            .serve_with_shutdown(addr, combined_shutdown);
        tokio::pin!(serve);

        tokio::select! {
            // Bias toward the watch arm: if both are ready in the same poll
            // (rare but possible — graceful shutdown completed in the same
            // tick the watch returned), we want to surface the watch error
            // rather than report a clean shutdown.
            biased;

            watch_result = &mut watch_handle => {
                // Watch terminated. State is already poisoned (see watch
                // task body in into_router). Trigger tonic drain and wait
                // for it to finish, then report the watch's outcome.
                let _ = cancel_tx.send(());
                let _ = serve.await;
                join_to_server_result(watch_result)
            }
            serve_result = &mut serve => {
                // User shutdown fired (or our cancel — but watch arm has
                // `biased` priority, so reaching here means user shutdown).
                // The watch task may still be running; aborting it loses
                // any error it was about to report, but the process is
                // shutting down so that's acceptable.
                watch_handle.abort();
                serve_result?;
                Ok(())
            }
        }
    }

    /// Run the gRPC server on a caller-provided `TcpListener` until either
    /// the caller-provided `shutdown` fires or the leader-watch task terminates.
    ///
    /// Use this instead of [`Self::serve_with_shutdown`] when you need to
    /// observe the OS-picked port (`127.0.0.1:0`) before clients connect, or
    /// when you want to wrap the listener in an outer adapter before passing it
    /// in. The listening socket is owned by the caller and passed here; tsoracle
    /// starts accepting on it immediately.
    ///
    /// Three outcomes:
    /// 1. `shutdown` fires first → tonic drains in-flights and returns `Ok`.
    ///    The watch handle is aborted; any error it had been about to return
    ///    is forfeited (the process is shutting down anyway).
    /// 2. Watch returns `Ok(Err(e))` → poisoned state is already published;
    ///    the caller-provided shutdown is cancelled internally so tonic begins
    ///    graceful shutdown; in-flight `GetTs` calls whose `try_grant` already
    ///    succeeded complete with the timestamps they were allocated; new calls
    ///    fail fast. Returns `Err(e)`.
    /// 3. Watch task panics → returns `Err(ServerError::WatchPanic{..})`
    ///    with the panic payload stringified. Same drain semantics as (2).
    pub async fn serve_with_listener(
        self,
        listener: tokio::net::TcpListener,
        shutdown: impl Future<Output = ()> + Send + 'static,
    ) -> Result<(), ServerError> {
        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
        let tls_config = self.tls_config.clone();

        let (routes, mut watch_handle) = self.into_router();
        let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();

        let combined_shutdown = async move {
            tokio::select! {
                _ = shutdown => {}
                _ = cancel_rx => {}
            }
        };

        let incoming = tonic::transport::server::TcpIncoming::from(listener);

        let mut tonic = TonicServer::builder();
        #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
        if let Some(cfg) = tls_config {
            tonic = tonic.tls_config(cfg).map_err(ServerError::Transport)?;
        }
        let serve = tonic
            .add_routes(routes)
            .serve_with_incoming_shutdown(incoming, combined_shutdown);
        tokio::pin!(serve);

        tokio::select! {
            biased;

            watch_result = &mut watch_handle => {
                let _ = cancel_tx.send(());
                let _ = serve.await;
                join_to_server_result(watch_result)
            }
            serve_result = &mut serve => {
                watch_handle.abort();
                serve_result?;
                Ok(())
            }
        }
    }
}

/// Convert a `JoinHandle` result into a `ServerError`-typed result.
///
/// - `Ok(Ok(()))` — task ended cleanly (driver stream closed). Caller decides
///   whether this is normal (shutdown) or anomalous.
/// - `Ok(Err(e))` — task returned an error. Forward verbatim.
/// - `Err(JoinError)` — task was cancelled or panicked. Cancellation maps to
///   Ok (we asked for it); panic maps to `WatchPanic` with payload.
fn join_to_server_result(
    join_result: Result<Result<(), ServerError>, tokio::task::JoinError>,
) -> Result<(), ServerError> {
    match join_result {
        Ok(inner) => inner,
        Err(join_err) if join_err.is_panic() => {
            let payload = panic_payload_to_string(join_err.into_panic());
            Err(ServerError::WatchPanic { payload })
        }
        Err(_cancelled) => Ok(()),
    }
}

fn panic_payload_to_string(panic: Box<dyn std::any::Any + Send>) -> String {
    if let Some(text) = panic.downcast_ref::<&'static str>() {
        (*text).to_string()
    } else if let Some(text) = panic.downcast_ref::<String>() {
        text.clone()
    } else {
        "watch task panicked with non-string payload".to_string()
    }
}

#[cfg(any(test, feature = "test-fakes"))]
impl Server {
    /// Test-only entry point for the leader-watch loop. Exposed to integration
    /// tests via the `test-fakes` feature; not part of the stable public API.
    #[doc(hidden)]
    pub async fn run_leader_watch_for_tests(self: Arc<Self>) -> Result<(), ServerError> {
        crate::fence::run_leader_watch(self).await
    }

    /// Test-only allocator probe. Issues a window grant against the current
    /// in-memory state without going through the gRPC service. Used by
    /// regression tests that need to observe the behavioral fence (no
    /// timestamp at or below the prior leader's high-water) directly.
    #[doc(hidden)]
    pub fn try_grant_for_tests(&self, count: u32) -> Result<WindowGrant, CoreError> {
        self.allocator.lock().try_grant(self.clock.now_ms(), count)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn panic_payload_to_string_recovers_static_str() {
        // `panic!("literal")` produces a `&'static str` payload; we want the
        // verbatim text so operators see what the watch task said.
        let payload: Box<dyn std::any::Any + Send> = Box::new("watch boom");
        assert_eq!(panic_payload_to_string(payload), "watch boom");
    }

    #[test]
    fn panic_payload_to_string_recovers_owned_string() {
        // `panic!("{var}")` produces a `String` payload (formatted at panic
        // time); the helper must downcast that branch too.
        let payload: Box<dyn std::any::Any + Send> = Box::new(String::from("formatted"));
        assert_eq!(panic_payload_to_string(payload), "formatted");
    }

    #[test]
    fn panic_payload_to_string_falls_back_for_other_types() {
        // Custom payloads (panic!(MyType { .. })) hit the catch-all branch.
        struct Custom;
        let payload: Box<dyn std::any::Any + Send> = Box::new(Custom);
        assert_eq!(
            panic_payload_to_string(payload),
            "watch task panicked with non-string payload",
        );
    }

    #[cfg(any(feature = "tls-rustls", feature = "tls-native"))]
    #[test]
    fn builder_stores_tls_config() {
        use crate::test_fakes::InMemoryDriver;

        let driver = Arc::new(InMemoryDriver::new());
        let cfg = tonic::transport::ServerTlsConfig::new();
        let server = Server::builder()
            .consensus_driver(driver)
            .tls_config(cfg)
            .build()
            .expect("build with tls_config must succeed");
        assert!(
            server.tls_config.is_some(),
            "tls_config must be stored on Server"
        );
    }

    #[tokio::test]
    async fn join_to_server_result_passes_through_clean_outcome() {
        // Ok(Ok(())) — task finished cleanly; forward verbatim.
        let handle = tokio::spawn(async { Ok::<(), ServerError>(()) });
        let join = handle.await;
        assert!(matches!(join_to_server_result(join), Ok(())));
    }

    #[tokio::test]
    async fn join_to_server_result_forwards_inner_error() {
        // Ok(Err(e)) — task returned an error; forward it.
        let handle = tokio::spawn(async {
            Err::<(), ServerError>(ServerError::WatchPanic {
                payload: "synthetic".into(),
            })
        });
        let join = handle.await;
        match join_to_server_result(join) {
            Err(ServerError::WatchPanic { payload }) => assert_eq!(payload, "synthetic"),
            other => panic!("expected forwarded WatchPanic, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn join_to_server_result_translates_panic_to_watch_panic() {
        // Err(JoinError::is_panic) — task panicked; surface as WatchPanic with
        // the payload stringified by `panic_payload_to_string`.
        let handle = tokio::spawn(async {
            panic!("intentional");
            #[allow(unreachable_code)]
            Ok::<(), ServerError>(())
        });
        let join = handle.await;
        match join_to_server_result(join) {
            Err(ServerError::WatchPanic { payload }) => assert!(payload.contains("intentional")),
            other => panic!("expected WatchPanic, got {other:?}"),
        }
    }

    #[tokio::test]
    async fn join_to_server_result_treats_cancellation_as_clean_exit() {
        // Err(JoinError::is_cancelled) — caller aborted the task; we asked
        // for that, so map to Ok.
        let handle: tokio::task::JoinHandle<Result<(), ServerError>> =
            tokio::spawn(async { futures::future::pending().await });
        handle.abort();
        let join = handle.await;
        assert!(matches!(join_to_server_result(join), Ok(())));
    }
}