d_engine_server/api/embedded.rs
1//! Embedded mode for d-engine - application-friendly API
2//!
3//! This module provides [`EmbeddedEngine`], a high-level wrapper around [`Node`]
4//! that simplifies lifecycle management for embedded use cases.
5//!
6//! ## Comparison: Node vs EmbeddedEngine
7//!
8//! ### Using Node (Low-level API)
9//! ```ignore
10//! let node = NodeBuilder::new(config).start().await?;
11//! // Node does not provide client directly - use EmbeddedEngine instead
12//! tokio::spawn(async move { node.run().await });
13//! // Manual lifecycle management required
14//! ```
15//!
16//! ### Using EmbeddedEngine (High-level API)
17//! ```ignore
18//! let engine = EmbeddedEngine::start("./data/my-app").await?;
19//! engine.wait_ready(Duration::from_secs(5)).await?;
20//! let client = engine.client();
21//! engine.stop().await?;
22//! // Lifecycle managed automatically
23//! ```
24//!
25//! ## When to Use
26//!
27//! - **EmbeddedEngine**: Application developers who want simplicity
28//! - **Node**: Framework developers who need fine-grained control
29//!
30//! # Application Responsibilities in Multi-Node Deployments
31//!
32//! EmbeddedEngine provides **cluster state APIs** but does NOT handle request routing.
33//! Applications are responsible for handling follower write requests.
34//!
35//! ## What EmbeddedEngine Provides
36//!
37//! - ✅ `is_leader()` - Check if current node is leader
38//! - ✅ `leader_info()` - Get leader ID and term
39//! - ✅ `EmbeddedClient` returns `NotLeader` error on follower writes
40//! - ✅ Zero-overhead in-process communication (<0.1ms)
41//!
42//! ## What Applications Must Handle
43//!
44//! - ❌ Request routing (follower → leader forwarding)
45//! - ❌ Load balancing across nodes
46//! - ❌ Health check endpoints for load balancers
47//!
48//! ## Integration Patterns
49//!
50//! ### Pattern 1: Load Balancer Health Checks (Recommended for HA)
51//!
52//! **Application provides HTTP health check endpoints:**
53//! ```ignore
54//! use axum::{Router, routing::get, http::StatusCode};
55//!
56//! // Health check endpoint for load balancer (e.g. HAProxy)
57//! async fn health_primary(engine: Arc<EmbeddedEngine>) -> StatusCode {
58//! if engine.is_leader() {
59//! StatusCode::OK // 200 - Load balancer routes writes here
60//! } else {
61//! StatusCode::SERVICE_UNAVAILABLE // 503 - Load balancer skips this node
62//! }
63//! }
64//!
65//! async fn health_replica(engine: Arc<EmbeddedEngine>) -> StatusCode {
66//! if !engine.is_leader() {
67//! StatusCode::OK // 200 - Load balancer routes reads here
68//! } else {
69//! StatusCode::SERVICE_UNAVAILABLE // 503
70//! }
71//! }
72//!
73//! let app = Router::new()
74//! .route("/primary", get(health_primary)) // For write traffic
75//! .route("/replica", get(health_replica)); // For read traffic
76//!
77//! axum::Server::bind(&"0.0.0.0:8008".parse()?)
78//! .serve(app.into_make_service())
79//! .await?;
80//! ```
81//!
82//! ### Pattern 2: Pre-check Before Write (Simple)
83//!
84//! **Application checks leadership before handling writes:**
85//! ```ignore
86//! async fn handle_write_request(engine: &EmbeddedEngine, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
87//! if !engine.is_leader() {
88//! // Return HTTP 503 to client
89//! return Err("Not leader, please retry on another node");
90//! }
91//!
92//! // Safe to write (this node is leader)
93//! engine.client().put(key, value).await?;
94//! Ok(())
95//! }
96//! ```
97//!
98//! ### Pattern 3: HTTP 307 Redirect (Alternative)
99//!
100//! **Application returns redirect to leader:**
101//! ```ignore
102//! async fn handle_write_request(engine: &EmbeddedEngine) -> Response {
103//! match engine.client().put(key, value).await {
104//! Ok(_) => Response::ok(),
105//! Err(LocalClientError::NotLeader { leader_address: Some(addr), .. }) => {
106//! // Redirect client to leader
107//! Response::redirect_307(addr)
108//! }
109//! Err(e) => Response::error(e),
110//! }
111//! }
112//! ```
113//!
114//! ## Design Philosophy
115//!
116//! **Why doesn't EmbeddedEngine auto-forward writes?**
117//!
118//! 1. **Preserves zero-overhead guarantee** - Auto-forwarding requires gRPC client (adds network/serialization)
119//! 2. **Maintains simplicity** - No hidden network calls in "embedded" mode
120//! 3. **Flexible deployment** - Applications choose load balancer health checks, HTTP redirect, or other strategies
121//! 4. **Performance transparency** - Developers know exactly when network calls occur
122//!
123//! For auto-forwarding with gRPC overhead, use standalone mode with `GrpcClient`.
124
125use crate::Result;
126#[cfg(feature = "rocksdb")]
127use crate::RocksDBStateMachine;
128#[cfg(feature = "rocksdb")]
129use crate::RocksDBStorageEngine;
130#[cfg(feature = "rocksdb")]
131use crate::RocksDBUnifiedEngine;
132use crate::StateMachine;
133use crate::StorageEngine;
134use crate::api::EmbeddedClient;
135use crate::node::NodeBuilder;
136use std::sync::Arc;
137use std::time::Duration;
138use tokio::sync::Mutex;
139use tokio::sync::watch;
140use tokio::task::JoinHandle;
141use tracing::error;
142use tracing::info;
143
144struct Inner {
145 node_handle: Mutex<Option<JoinHandle<Result<()>>>>,
146 shutdown_tx: watch::Sender<()>,
147 client: Arc<EmbeddedClient>,
148 leader_elected_rx: watch::Receiver<Option<crate::LeaderInfo>>,
149 membership_rx: watch::Receiver<crate::membership::MembershipSnapshot>,
150 is_stopped: Mutex<bool>,
151 node_id: u32,
152}
153
154/// Embedded d-engine with automatic lifecycle management.
155///
156/// **Thread-safe**: Clone and share across threads freely.
157/// All methods use `&self` - safe to call from multiple contexts.
158///
159/// Provides high-level KV API for embedded usage:
160/// - `start()` / `start_with()` - Initialize and spawn node
161/// - `wait_ready()` - Wait for leader election
162/// - `client()` - Get embedded client
163/// - `stop()` - Graceful shutdown
164///
165/// # Example
166/// ```ignore
167/// use d_engine::EmbeddedEngine;
168/// use std::time::Duration;
169///
170/// let engine = EmbeddedEngine::start("./data/my-app").await?;
171/// engine.wait_ready(Duration::from_secs(5)).await?;
172///
173/// let client = engine.client();
174/// client.put(b"key", b"value").await?;
175///
176/// engine.stop().await?;
177/// ```
178#[derive(Clone)]
179pub struct EmbeddedEngine {
180 inner: Arc<Inner>,
181}
182
183impl EmbeddedEngine {
184 /// Start engine with an explicit data directory.
185 ///
186 /// `data_dir` has highest priority and always overrides `cluster.db_root_dir` from
187 /// `CONFIG_PATH` or `RAFT__` environment variables. Other configuration (network,
188 /// Raft timeouts, cluster topology) is still read from those sources if set.
189 ///
190 /// The directory is created automatically if it does not exist.
191 /// If it already contains data the engine opens it in place (idempotent).
192 ///
193 /// # Example
194 /// ```ignore
195 /// // Minimal — just supply a path
196 /// let engine = EmbeddedEngine::start("./data/my-app").await?;
197 /// engine.wait_ready(Duration::from_secs(5)).await?;
198 ///
199 /// // Works with any AsRef<Path>
200 /// let engine = EmbeddedEngine::start(std::path::Path::new("/var/lib/my-app")).await?;
201 /// ```
202 #[cfg(feature = "rocksdb")]
203 pub async fn start(data_dir: impl AsRef<std::path::Path>) -> Result<Self> {
204 let mut config = d_engine_core::RaftNodeConfig::new()?;
205 config.cluster.db_root_dir = data_dir.as_ref().to_path_buf();
206 let config = config.validate()?;
207
208 let base_dir = config.cluster.db_root_dir.clone();
209 tokio::fs::create_dir_all(&base_dir)
210 .await
211 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
212
213 let (storage, mut sm) = if config.storage.unified_db {
214 let db_path = base_dir.join("db");
215 info!(
216 "Starting embedded engine with unified RocksDB at {:?}",
217 db_path
218 );
219 RocksDBUnifiedEngine::open(&db_path)?
220 } else {
221 info!(
222 "Starting embedded engine with separate RocksDB instances at {:?}",
223 base_dir
224 );
225 let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
226 let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
227 (storage, sm)
228 };
229
230 let lease_cfg = &config.raft.state_machine.lease;
231 if lease_cfg.enabled {
232 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
233 sm.set_lease(lease);
234 }
235
236 Self::start_node(config, Arc::new(storage), Arc::new(sm)).await
237 }
238
239 /// Start engine with explicit configuration file.
240 ///
241 /// Reads configuration from specified file path.
242 /// Data directory is determined by config's `cluster.db_root_dir` setting.
243 ///
244 /// # Arguments
245 /// - `config_path`: Path to configuration file (e.g. "d-engine.toml")
246 ///
247 /// # Example
248 /// ```ignore
249 /// let engine = EmbeddedEngine::start_with("config/node1.toml").await?;
250 /// engine.wait_ready(Duration::from_secs(5)).await?;
251 /// ```
252 #[cfg(feature = "rocksdb")]
253 pub async fn start_with(config_path: &str) -> Result<Self> {
254 let config = d_engine_core::RaftNodeConfig::new()?
255 .with_override_config(config_path)?
256 .validate()?;
257 let base_dir = std::path::PathBuf::from(&config.cluster.db_root_dir);
258
259 tokio::fs::create_dir_all(&base_dir)
260 .await
261 .map_err(|e| crate::Error::Fatal(format!("Failed to create data directory: {e}")))?;
262
263 let (storage, mut sm) = if config.storage.unified_db {
264 let db_path = base_dir.join("db");
265 info!(
266 "Starting embedded engine with unified RocksDB at {:?}",
267 db_path
268 );
269 RocksDBUnifiedEngine::open(&db_path)?
270 } else {
271 info!(
272 "Starting embedded engine with separate RocksDB instances at {:?}",
273 base_dir
274 );
275 let storage = RocksDBStorageEngine::new(base_dir.join("storage"))?;
276 let sm = RocksDBStateMachine::new(base_dir.join("state_machine"))?;
277 (storage, sm)
278 };
279
280 // Inject lease if enabled
281 let lease_cfg = &config.raft.state_machine.lease;
282 if lease_cfg.enabled {
283 let lease = Arc::new(crate::storage::DefaultLease::new(lease_cfg.clone()));
284 sm.set_lease(lease);
285 }
286
287 Self::start_custom(Arc::new(storage), Arc::new(sm), Some(config_path)).await
288 }
289
290 /// Start engine with custom storage and state machine.
291 ///
292 /// Advanced API for users providing custom storage implementations.
293 ///
294 /// # Arguments
295 /// - `config_path`: Optional path to configuration file
296 /// - `storage_engine`: Custom storage engine implementation
297 /// - `state_machine`: Custom state machine implementation
298 ///
299 /// # Example
300 /// ```ignore
301 /// let storage = Arc::new(MyCustomStorage::new()?);
302 /// let sm = Arc::new(MyCustomStateMachine::new()?);
303 /// let engine = EmbeddedEngine::start_custom(storage, sm, None).await?;
304 /// ```
305 pub async fn start_custom<SE, SM>(
306 storage_engine: Arc<SE>,
307 state_machine: Arc<SM>,
308 config_path: Option<&str>,
309 ) -> Result<Self>
310 where
311 SE: StorageEngine + std::fmt::Debug + 'static,
312 SM: StateMachine + std::fmt::Debug + 'static,
313 {
314 let node_config = if let Some(path) = config_path {
315 d_engine_core::RaftNodeConfig::default()
316 .with_override_config(path)?
317 .validate()?
318 } else {
319 d_engine_core::RaftNodeConfig::new()?.validate()?
320 };
321
322 Self::start_node(node_config, storage_engine, state_machine).await
323 }
324
325 /// Build and launch the node from a validated config and pre-built storage.
326 async fn start_node<SE, SM>(
327 node_config: d_engine_core::RaftNodeConfig,
328 storage_engine: Arc<SE>,
329 state_machine: Arc<SM>,
330 ) -> Result<Self>
331 where
332 SE: StorageEngine + std::fmt::Debug + 'static,
333 SM: StateMachine + std::fmt::Debug + 'static,
334 {
335 info!("Starting embedded d-engine");
336
337 let (shutdown_tx, shutdown_rx) = watch::channel(());
338
339 let node = NodeBuilder::init(node_config, shutdown_rx)
340 .storage_engine(storage_engine)
341 .state_machine(state_machine)
342 .start()
343 .await?;
344
345 let leader_elected_rx = node.leader_change_notifier();
346 let membership_rx = node.membership_change_notifier();
347
348 #[cfg(not(feature = "watch"))]
349 let client = Arc::new(EmbeddedClient::new_internal(
350 node.event_tx.clone(),
351 node.cmd_tx.clone(),
352 node.node_id,
353 Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
354 ));
355
356 #[cfg(feature = "watch")]
357 let client = {
358 let watch_registry = node.watch_registry.clone();
359 let mut client = EmbeddedClient::new_internal(
360 node.event_tx.clone(),
361 node.cmd_tx.clone(),
362 node.node_id,
363 Duration::from_millis(node.node_config.raft.general_raft_timeout_duration_in_ms),
364 );
365 if let Some(registry) = &watch_registry {
366 client = client.with_watch_registry(registry.clone());
367 }
368 Arc::new(client)
369 };
370
371 let node_id = node.node_id();
372
373 let node_handle = tokio::spawn(async move {
374 if let Err(e) = node.run().await {
375 error!("Node run error: {:?}", e);
376 Err(e)
377 } else {
378 Ok(())
379 }
380 });
381
382 info!("Embedded d-engine started successfully");
383
384 Ok(Self {
385 inner: Arc::new(Inner {
386 node_handle: Mutex::new(Some(node_handle)),
387 shutdown_tx,
388 client,
389 leader_elected_rx,
390 membership_rx,
391 is_stopped: Mutex::new(false),
392 node_id,
393 }),
394 })
395 }
396
397 /// Wait until the cluster is ready to serve requests.
398 ///
399 /// Blocks until a leader has been elected **and** its no-op entry is committed
400 /// by a majority of nodes (Raft §8). Only at this point is the leader guaranteed
401 /// to be aware of all previously committed entries and safe to serve reads and writes.
402 ///
403 /// Event-driven notification (no polling), <1ms latency after noop commit.
404 ///
405 /// # Timeout Guidelines
406 ///
407 /// **Single-node mode** (most common for development):
408 /// - Typical: <100ms (near-instant election)
409 /// - Recommended: `Duration::from_secs(3)`
410 ///
411 /// **Multi-node HA cluster** (production):
412 /// - Typical: 1-3s (depends on network latency and `general_raft_timeout_duration_in_ms`)
413 /// - Recommended: `Duration::from_secs(10)`
414 ///
415 /// **Special cases**:
416 /// - Health checks: `Duration::from_secs(3)` (fail fast if cluster unhealthy)
417 /// - Startup scripts: `Duration::from_secs(30)` (allow time for cluster stabilization)
418 /// - Development/testing: `Duration::from_secs(5)` (balance between speed and reliability)
419 ///
420 /// # Returns
421 /// - `Ok(LeaderInfo)` - Leader elected successfully
422 /// - `Err(...)` - Timeout or cluster unavailable
423 ///
424 /// # Example
425 /// ```ignore
426 /// // Single-node development
427 /// let engine = EmbeddedEngine::start("./data/my-app").await?;
428 /// let leader = engine.wait_ready(Duration::from_secs(3)).await?;
429 ///
430 /// // Multi-node production
431 /// let engine = EmbeddedEngine::start_with("cluster.toml").await?;
432 /// let leader = engine.wait_ready(Duration::from_secs(10)).await?;
433 /// println!("Leader elected: {} (term {})", leader.leader_id, leader.term);
434 /// ```
435 pub async fn wait_ready(
436 &self,
437 timeout: std::time::Duration,
438 ) -> Result<crate::LeaderInfo> {
439 let mut rx = self.inner.leader_elected_rx.clone();
440
441 tokio::time::timeout(timeout, async {
442 // Check current value first (leader may already be elected)
443 if let Some(info) = rx.borrow().as_ref() {
444 info!(
445 "Leader already elected: {} (term {})",
446 info.leader_id, info.term
447 );
448 return Ok(*info);
449 }
450
451 loop {
452 // Wait for leader election event (event-driven, no polling)
453 let _ = rx.changed().await;
454
455 // Check if a leader is elected
456 if let Some(info) = rx.borrow().as_ref() {
457 info!("Leader elected: {} (term {})", info.leader_id, info.term);
458 return Ok(*info);
459 }
460 }
461 })
462 .await
463 .map_err(|_| crate::Error::Fatal("Leader election timeout".to_string()))?
464 }
465
466 /// Subscribe to leader change notifications.
467 ///
468 /// Returns a receiver that will be notified whenever:
469 /// - First leader is elected
470 /// - Leader changes (re-election)
471 /// - No leader exists (during election)
472 ///
473 /// # Performance
474 /// Event-driven notification (no polling), <1ms latency
475 ///
476 /// # Example
477 /// ```ignore
478 /// let mut leader_rx = engine.leader_change_notifier();
479 /// tokio::spawn(async move {
480 /// while leader_rx.changed().await.is_ok() {
481 /// match leader_rx.borrow().as_ref() {
482 /// Some(info) => println!("Leader: {} (term {})", info.leader_id, info.term),
483 /// None => println!("No leader"),
484 /// }
485 /// }
486 /// });
487 /// ```
488 pub fn leader_change_notifier(&self) -> watch::Receiver<Option<crate::LeaderInfo>> {
489 self.inner.leader_elected_rx.clone()
490 }
491
492 /// Subscribe to committed membership change notifications.
493 ///
494 /// Returns a `watch::Receiver` that fires whenever a `ConfChange` entry
495 /// (node join, removal, or learner promotion) is committed by a majority.
496 ///
497 /// All nodes — leader, follower, and learner — fire the notification because
498 /// every node walks the same `CommitHandler::apply_config_change()` path.
499 ///
500 /// The first `borrow()` returns the current membership state immediately
501 /// without waiting for a change.
502 ///
503 /// ## Distinguishing this from other notifiers
504 ///
505 /// - [`Self::leader_change_notifier`]: fires on leader election changes, **not** membership changes
506 /// - [`Self::wait_ready`]: resolves when a leader is elected and the cluster is ready, **not** membership changes
507 ///
508 /// ## Usage
509 /// ```ignore
510 /// let mut rx = engine.watch_membership();
511 /// while rx.changed().await.is_ok() {
512 /// let snapshot = rx.borrow_and_update().clone();
513 /// // snapshot.members — current voters
514 /// // snapshot.learners — current non-voting learners
515 /// // snapshot.committed_index — idempotency key
516 /// scheduler.on_membership_changed(snapshot).await;
517 /// }
518 /// ```
519 pub fn watch_membership(&self) -> watch::Receiver<crate::membership::MembershipSnapshot> {
520 self.inner.membership_rx.clone()
521 }
522
523 /// Returns true if the current node is the Raft leader.
524 ///
525 /// # Use Cases
526 /// - Load balancer health checks (e.g. HAProxy `/primary` endpoint returning HTTP 200/503)
527 /// - Prevent write requests to followers before they fail
528 /// - Application-level request routing decisions
529 ///
530 /// # Performance
531 /// Zero-cost operation (reads cached leader state from watch channel)
532 ///
533 /// # Example
534 /// ```ignore
535 /// // Load balancer health check endpoint
536 /// #[get("/primary")]
537 /// async fn health_primary(engine: &EmbeddedEngine) -> StatusCode {
538 /// if engine.is_leader() {
539 /// StatusCode::OK // Routes writes here
540 /// } else {
541 /// StatusCode::SERVICE_UNAVAILABLE
542 /// }
543 /// }
544 ///
545 /// // Application request handler
546 /// if engine.is_leader() {
547 /// client.put(key, value).await?;
548 /// } else {
549 /// return Err("Not leader, write rejected");
550 /// }
551 /// ```
552 pub fn is_leader(&self) -> bool {
553 self.inner
554 .leader_elected_rx
555 .borrow()
556 .as_ref()
557 .map(|info| info.leader_id == self.inner.node_id)
558 .unwrap_or(false)
559 }
560
561 /// Returns current leader information if available.
562 ///
563 /// # Returns
564 /// - `Some(LeaderInfo)` if a leader is elected (includes leader_id and term)
565 /// - `None` if no leader exists (during election or network partition)
566 ///
567 /// # Use Cases
568 /// - Monitoring dashboards showing cluster state
569 /// - Debugging leader election issues
570 /// - Logging cluster topology changes
571 ///
572 /// # Example
573 /// ```ignore
574 /// if let Some(info) = engine.leader_info() {
575 /// println!("Leader: {} (term {})", info.leader_id, info.term);
576 /// } else {
577 /// println!("No leader elected, cluster unavailable");
578 /// }
579 /// ```
580 pub fn leader_info(&self) -> Option<crate::LeaderInfo> {
581 *self.inner.leader_elected_rx.borrow()
582 }
583
584 /// Get a reference to the local KV client.
585 ///
586 /// The client is available immediately after `start()`,
587 /// but requests will only succeed after `wait_ready()` completes.
588 ///
589 /// # Example
590 /// ```ignore
591 /// let engine = EmbeddedEngine::start("./data/my-app").await?;
592 /// engine.wait_ready(Duration::from_secs(5)).await?;
593 /// let client = engine.client();
594 /// client.put(b"key", b"value").await?;
595 /// ```
596 pub fn client(&self) -> Arc<EmbeddedClient> {
597 Arc::clone(&self.inner.client)
598 }
599
600 /// Stop the embedded d-engine gracefully (idempotent).
601 ///
602 /// This method:
603 /// 1. Sends shutdown signal to node
604 /// 2. Waits for node.run() to complete
605 /// 3. Propagates any errors from node execution
606 ///
607 /// Safe to call multiple times - subsequent calls are no-ops.
608 ///
609 /// # Errors
610 /// Returns error if node encountered issues during shutdown.
611 ///
612 /// # Example
613 /// ```ignore
614 /// engine.stop().await?;
615 /// engine.stop().await?; // No-op, returns Ok(())
616 /// ```
617 pub async fn stop(&self) -> Result<()> {
618 let mut is_stopped = self.inner.is_stopped.lock().await;
619 if *is_stopped {
620 return Ok(());
621 }
622
623 info!("Stopping embedded d-engine");
624
625 // Send shutdown signal
626 let _ = self.inner.shutdown_tx.send(());
627
628 // Wait for node task to complete
629 let mut handle_guard = self.inner.node_handle.lock().await;
630 if let Some(handle) = handle_guard.take() {
631 match handle.await {
632 Ok(result) => {
633 info!("Embedded d-engine stopped successfully");
634 *is_stopped = true;
635 result
636 }
637 Err(e) => {
638 error!("Node task panicked: {:?}", e);
639 *is_stopped = true;
640 Err(crate::Error::Fatal(format!("Node task panicked: {e}")))
641 }
642 }
643 } else {
644 *is_stopped = true;
645 Ok(())
646 }
647 }
648
649 /// Check if the engine has been stopped.
650 ///
651 /// # Example
652 /// ```ignore
653 /// if engine.is_stopped() {
654 /// println!("Engine is stopped");
655 /// }
656 /// ```
657 pub async fn is_stopped(&self) -> bool {
658 *self.inner.is_stopped.lock().await
659 }
660
661 /// Returns the unique identifier for this Raft node.
662 pub fn node_id(&self) -> u32 {
663 self.inner.node_id
664 }
665}
666
667impl Drop for EmbeddedEngine {
668 fn drop(&mut self) {
669 // Warn if stop() was not called
670 if let Ok(handle) = self.inner.node_handle.try_lock()
671 && let Some(h) = &*handle
672 && !h.is_finished()
673 {
674 error!("EmbeddedEngine dropped without calling stop() - background task may leak");
675 }
676 }
677}