Skip to main content

de_mls/app/session/
inbound.rs

1//! Session-side inbound dispatch.
2//!
3//! `dispatch_inbound_result` and every `ProcessResult` branch handler live
4//! on `SessionRunner` as associated functions taking `Arc<RwLock<Self>>` so
5//! they can release the runner lock across `.await` points without holding
6//! it during proposal lifecycles.
7//!
8//! `LeaveConversation` is split: the session-side helper `prepare_self_leave`
9//! does the protocol work (emit `Leaving`, take and delete the MLS service);
10//! the User-side caller drops the entry from the registry, cleans up the
11//! consensus scope, and broadcasts `ConversationLifecycle::Removed`. The
12//! session method returns [`DispatchOutcome::LeaveRequested`] so the caller
13//! knows to finish the lifecycle on the User side.
14
15use std::sync::{Arc, RwLock};
16
17use hashgraph_like_consensus::protos::consensus::v1::Proposal;
18use prost::Message;
19use tracing::{error, info};
20
21use crate::{
22    app::{
23        ConversationState, LockExt, SessionRunner, UserError,
24        session::{
25            consensus::build_vote_banner_event,
26            consensus_bridge::{forward_incoming_proposal, forward_incoming_vote},
27            runner::send_packet,
28        },
29    },
30    core::{
31        ConsensusPlugin, ConversationPluginsFactory, PeerScoringPlugin, ProcessResult,
32        ProposalKind, ScoreSnapshot, SessionEvent, StewardList, StewardListConfig,
33        StewardListPlugin, member_set,
34    },
35    mls_crypto::MlsService,
36    protos::de_mls::messages::v1::{
37        AppMessage, ConversationMessage, ConversationSync, ConversationUpdateRequest, TimingConfig,
38        conversation_update_request,
39    },
40};
41
42/// What `SessionRunner::dispatch_inbound_result` hands back to the
43/// caller. `LeaveRequested` signals that the session has done its
44/// protocol-side teardown (emitted `Leaving`, deleted MLS state) and the
45/// caller — which holds the User-side handles — must drop the entry
46/// from the registry and broadcast the lifecycle removal.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum DispatchOutcome {
49    /// No further User-side action required.
50    Done,
51    /// The session has prepared itself for removal; the caller should
52    /// remove the registry entry and clean up the consensus scope.
53    LeaveRequested,
54}
55
56impl<P: ConsensusPlugin, CP: ConversationPluginsFactory> SessionRunner<P, CP> {
57    /// Dispatch a single [`ProcessResult`] to its branch handler. Called
58    /// by `User::process_inbound_packet` after the MLS-side
59    /// `process_inbound` returns a result, and by other call sites that
60    /// produce a `ProcessResult` (e.g. the welcome-side
61    /// `JoinedConversation`).
62    pub(crate) async fn dispatch_inbound_result(
63        arc: &Arc<RwLock<Self>>,
64        result: ProcessResult,
65    ) -> Result<DispatchOutcome, UserError> {
66        match result {
67            ProcessResult::AppMessage(msg) => {
68                arc.read_or_err("session")?
69                    .emit_event(SessionEvent::AppMessage(*msg));
70                Ok(DispatchOutcome::Done)
71            }
72            ProcessResult::Proposal(proposal) => {
73                Self::on_incoming_proposal(arc, *proposal).await?;
74                Ok(DispatchOutcome::Done)
75            }
76            ProcessResult::Vote(vote) => {
77                let proposal_id = vote.proposal_id;
78                let (consensus, conversation_name, outcome_applied) = {
79                    let s = arc.read_or_err("session")?;
80                    (
81                        s.consensus.clone(),
82                        s.conversation_name.clone(),
83                        s.handle
84                            .conversation
85                            .is_consensus_outcome_applied(proposal_id),
86                    )
87                };
88                forward_incoming_vote::<P>(&conversation_name, *vote, &consensus, outcome_applied)
89                    .await?;
90                Ok(DispatchOutcome::Done)
91            }
92            ProcessResult::MembershipChangeReceived(request) => {
93                Self::handle_incoming_update_request(arc, *request).await?;
94                Ok(DispatchOutcome::Done)
95            }
96            ProcessResult::JoinedConversation(_name) => {
97                // `name` is always this conversation's name — `process_inbound`
98                // emits it via the local MLS service. Use the session's own
99                // `conversation_name` rather than the parameter.
100                Self::on_joined_conversation(arc).await?;
101                Ok(DispatchOutcome::Done)
102            }
103            ProcessResult::ConversationUpdated => {
104                Self::on_conversation_updated(arc).await?;
105                Ok(DispatchOutcome::Done)
106            }
107            ProcessResult::LeaveConversation => {
108                Self::prepare_self_leave(arc)?;
109                Ok(DispatchOutcome::LeaveRequested)
110            }
111            ProcessResult::CommitCandidateReceived { steward } => {
112                Self::on_commit_candidate_received(arc, &steward).await?;
113                Ok(DispatchOutcome::Done)
114            }
115            ProcessResult::ConversationSyncReceived(sync) => {
116                Self::on_conversation_sync(arc, *sync)?;
117                Ok(DispatchOutcome::Done)
118            }
119            ProcessResult::Noop(reason) => {
120                let conv_name = arc.read_or_err("session")?.conversation_name.clone();
121                tracing::debug!(
122                    conversation = %conv_name,
123                    ?reason,
124                    "inbound dispatched as noop"
125                );
126                Ok(DispatchOutcome::Done)
127            }
128        }
129    }
130
131    /// Before forwarding to consensus, mirror intent into local buffers:
132    /// emergency proposals set the partial-freeze flag and resolve any
133    /// locally-buffered ECP for the same violation; membership-change
134    /// proposals get mirrored into the pending-update buffer so a future
135    /// epoch steward can retry if this round fails.
136    ///
137    /// RFC §"Partial Freeze Semantics" asks that lower-priority proposals
138    /// from peers be DROPPED during an active emergency, not merely locally
139    /// blocked. We don't drop today — the RFC's Δ-synchrony assumption keeps
140    /// divergence windows small. Consensus-service-level priority gating is
141    /// tracked as a backlog item in `docs/ROADMAP.md`.
142    async fn on_incoming_proposal(
143        arc: &Arc<RwLock<Self>>,
144        proposal: Proposal,
145    ) -> Result<(), UserError> {
146        let decoded = ConversationUpdateRequest::decode(proposal.payload.as_slice()).ok();
147        if let Some(req) = decoded.as_ref() {
148            let mut s = arc.write_or_err("session")?;
149            let current_epoch = match s.handle.mls() {
150                Some(mls) => mls.current_epoch()?,
151                None => 0,
152            };
153            match &req.payload {
154                Some(conversation_update_request::Payload::EmergencyCriteria(_)) => {
155                    s.handle
156                        .conversation
157                        .observe_emergency(proposal.proposal_id);
158                }
159                Some(conversation_update_request::Payload::InviteMember(_))
160                | Some(conversation_update_request::Payload::RemoveMember(_)) => {
161                    s.handle
162                        .conversation
163                        .buffer_pending_update(req.clone(), current_epoch);
164                }
165                _ => {}
166            }
167        }
168        let proposal_id = proposal.proposal_id;
169        let expected_voters = proposal.expected_voters_count;
170        let payload = proposal.payload.clone();
171        let kind = decoded
172            .as_ref()
173            .map(ProposalKind::of)
174            .unwrap_or(ProposalKind::Commit);
175        let (consensus, conversation_name) = {
176            let s = arc.read_or_err("session")?;
177            (s.consensus.clone(), s.conversation_name.clone())
178        };
179        forward_incoming_proposal::<P>(&conversation_name, proposal, &consensus).await?;
180        // Skip the banner + auto-vote for fast-path proposals: the
181        // creator's bundled YES already resolved the session, so peers have
182        // nothing to vote on.
183        if expected_voters > 1 {
184            let banner = build_vote_banner_event(&conversation_name, proposal_id, payload);
185            arc.read_or_err("session")?
186                .emit_event(SessionEvent::AppMessage(banner));
187            let (delay, vote) = {
188                let s = arc.read_or_err("session")?;
189                (
190                    s.handle.config.voting_delay_for(kind),
191                    s.handle.config.liveness_criteria_yes,
192                )
193            };
194            arc.write_or_err("session")?
195                .register_auto_vote(proposal_id, delay, vote);
196        }
197        Ok(())
198    }
199
200    /// We just joined via welcome. Broadcast a system "joined" chat message,
201    /// sync scoring, and transition to Working. Pending-update pruning is
202    /// defensive — PendingJoin doesn't buffer, but paths may change.
203    async fn on_joined_conversation(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
204        arc.write_or_err("session")?
205            .prune_pending_updates_after_commit()?;
206
207        let (packet, mls_members, conversation_name) = {
208            let mut s = arc.write_or_err("session")?;
209            let msg: AppMessage = ConversationMessage {
210                message: format!("User {} joined the conversation", s.identity_display)
211                    .into_bytes(),
212                sender: "SYSTEM".to_string(),
213                conversation_name: s.conversation_name.clone(),
214            }
215            .into();
216            let app_id = Arc::clone(&s.app_id);
217            let conversation_name = s.conversation_name.clone();
218            let mls = s.handle.expect_mls_mut()?;
219            let members = mls.members().unwrap_or_default();
220            let packet = mls.build_message(&msg, &app_id)?;
221            (packet, members, conversation_name)
222        };
223        let transport = Arc::clone(arc.read_or_err("session")?.transport());
224        send_packet(&transport, packet)?;
225        arc.read_or_err("session")?.emit_event(SessionEvent::Joined);
226        arc.write_or_err("session")?
227            .sync_scoring_members(&mls_members);
228
229        let event = arc.write_or_err("session")?.start_working();
230        arc.read_or_err("session")?
231            .emit_event(SessionEvent::PhaseChange(event));
232        info!(conversation = %conversation_name, "joined conversation");
233        Ok(())
234    }
235
236    /// A commit merged. Sync scoring + pending-update buffers, transition to
237    /// Working, and run steward housekeeping (auto-fill, election kick-off,
238    /// buffered-update drain). The commit author's `SuccessfulCommit`
239    /// reward is emitted by `finalize_freeze_round`, not here.
240    async fn on_conversation_updated(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
241        let mls_members = {
242            let s = arc.read_or_err("session")?;
243            match s.handle.mls() {
244                Some(mls) => mls.members().unwrap_or_default(),
245                None => Vec::new(),
246            }
247        };
248        arc.write_or_err("session")?
249            .sync_scoring_members(&mls_members);
250        arc.write_or_err("session")?
251            .prune_pending_updates_after_commit()?;
252
253        // Transition to Working BEFORE steward checks (election needs Working
254        // state). Reset reelection_round: this commit advanced the epoch,
255        // so whatever retry cycle we were in belongs to the previous epoch.
256        let working_event = {
257            let mut s = arc.write_or_err("session")?;
258            s.handle.steward_list.reset_retry();
259            let state = s.handle.current_state();
260            if matches!(
261                state,
262                ConversationState::Working
263                    | ConversationState::Freezing
264                    | ConversationState::Selection
265                    | ConversationState::Reelection
266            ) {
267                Some(s.start_working())
268            } else {
269                None
270            }
271        };
272
273        Self::steward_list_housekeeping(arc).await?;
274        Self::process_buffered_updates(arc).await?;
275        Self::maybe_close_recovery_window(arc).await;
276
277        if let Some(event) = working_event {
278            arc.read_or_err("session")?
279                .emit_event(SessionEvent::PhaseChange(event));
280        }
281        Ok(())
282    }
283
284    /// Fire a steward election while `recovery_mode` is set so the next
285    /// list installs and closes the window.
286    async fn maybe_close_recovery_window(arc: &Arc<RwLock<Self>>) {
287        let in_recovery_mode = match arc.read_or_err("session") {
288            Ok(s) => s.handle.is_in_recovery_mode(),
289            Err(e) => {
290                tracing::warn!(error = %e, "recovery window check skipped: session lock poisoned");
291                return;
292            }
293        };
294        if !in_recovery_mode {
295            return;
296        }
297        if let Err(e) = Self::try_initiate_steward_election(arc, true, None).await {
298            let conv_name = arc
299                .read_or_err("session")
300                .map(|s| s.conversation_name.clone())
301                .unwrap_or_else(|_| "<poisoned>".to_string());
302            info!(
303                conversation = %conv_name,
304                error = %e,
305                "post-recovery election deferred"
306            );
307        }
308    }
309
310    /// Protocol-side teardown for `LeaveConversation`: emit `Leaving` on
311    /// the session's bus and delete the local MLS state. The User-side
312    /// caller drops the entry from the registry and broadcasts
313    /// `ConversationLifecycle::Removed`.
314    fn prepare_self_leave(arc: &Arc<RwLock<Self>>) -> Result<(), UserError> {
315        arc.read_or_err("session")?
316            .emit_event(SessionEvent::Leaving);
317        // Bind to a let so the write guard is dropped before `mls.delete()`
318        // runs the storage I/O — otherwise the guard's `if let` scrutinee
319        // lifetime would keep every other task on this session blocked
320        // throughout the delete.
321        let taken_mls = arc.write_or_err("session")?.handle.take_mls();
322        if let Some(mut mls) = taken_mls {
323            mls.delete()?;
324        }
325        Ok(())
326    }
327
328    /// Peer broadcast a commit candidate. If we were in Working, enter
329    /// Freezing and — if we're a steward — build our own candidate too.
330    async fn on_commit_candidate_received(
331        arc: &Arc<RwLock<Self>>,
332        steward: &[u8],
333    ) -> Result<(), UserError> {
334        {
335            let conv_name = arc.read_or_err("session")?.conversation_name.clone();
336            tracing::debug!(
337                conversation = %conv_name,
338                steward = ?steward,
339                "candidate received from peer steward"
340            );
341        }
342        let (event, outbound) = {
343            let mut s = arc.write_or_err("session")?;
344            if s.handle.current_state() != ConversationState::Working {
345                return Ok(());
346            }
347
348            let event = s.start_freezing();
349            let epoch = s.handle.expect_mls()?.current_epoch()?;
350            s.handle.conversation.ensure_freeze_round(epoch);
351
352            let self_identity = Arc::clone(&s.self_identity);
353            let app_id = Arc::clone(&s.app_id);
354            let outbound = if s.handle.steward_list.is_steward(&self_identity) {
355                match s.handle.create_commit_candidate(&self_identity, &app_id) {
356                    Ok(packets) => packets,
357                    Err(e) => {
358                        error!(
359                            conversation = %s.conversation_name,
360                            error = %e,
361                            "own commit candidate build failed"
362                        );
363                        None
364                    }
365                }
366            } else {
367                None
368            };
369            (event, outbound)
370        };
371
372        arc.read_or_err("session")?
373            .emit_event(SessionEvent::PhaseChange(event));
374        if let Some(message) = outbound {
375            let transport = Arc::clone(arc.read_or_err("session")?.transport());
376            send_packet(&transport, message)?;
377        }
378        Ok(())
379    }
380
381    /// Apply a steward's `ConversationSync` when we're a joiner without a steward
382    /// list. Validates the proposed list against the members it carries
383    /// (not the full MLS set — the list may have been generated before we
384    /// existed), then applies list + protocol flags + timing + peer scores.
385    fn on_conversation_sync(
386        arc: &Arc<RwLock<Self>>,
387        sync: ConversationSync,
388    ) -> Result<(), UserError> {
389        let (members, current_epoch, local_default_peer_score, conversation_name) = {
390            let s = arc.read_or_err("session")?;
391            if s.handle.steward_list.current_list().is_some() {
392                return Ok(());
393            }
394            let mls = s.handle.expect_mls()?;
395            (
396                mls.members()?,
397                mls.current_epoch()?,
398                s.handle.scoring.default_score(),
399                s.conversation_name.clone(),
400            )
401        };
402        if !validate_conversation_sync(
403            &conversation_name,
404            &sync,
405            current_epoch,
406            &members,
407            local_default_peer_score,
408        )? {
409            return Ok(());
410        }
411
412        let sn = sync.steward_members.len();
413        arc.write_or_err("session")?
414            .apply_conversation_sync_to_entry(&sync)?;
415
416        info!(
417            conversation = %conversation_name,
418            election_epoch = sync.election_epoch,
419            stewards = sn,
420            scores = sync.peer_scores.len(),
421            timing = sync.timing.is_some(),
422            "conversation sync applied"
423        );
424        Ok(())
425    }
426
427    fn apply_conversation_sync_to_entry(
428        &mut self,
429        sync: &ConversationSync,
430    ) -> Result<(), UserError> {
431        let mut protocol_config =
432            StewardListConfig::new(sync.sn_min as usize, sync.sn_max as usize)?;
433        protocol_config.allow_subset_candidates = sync.allow_subset_candidates;
434
435        let sn = sync.steward_members.len();
436        self.handle.steward_list.set_config(protocol_config);
437        let _events = self.handle.steward_list.install_list(
438            sync.election_epoch,
439            &sync.steward_members,
440            sn,
441            sync.retry_round,
442        )?;
443        self.handle
444            .steward_list
445            .set_max_retries(sync.max_reelection_attempts);
446        self.handle.scoring.set_threshold(sync.threshold_peer_score);
447        let snapshot = ScoreSnapshot {
448            diverged: sync
449                .peer_scores
450                .iter()
451                .map(|ps| (ps.member_id.clone(), ps.score))
452                .collect(),
453        };
454        // The ConversationSync sender (an existing steward) holds the same
455        // scores and is the canonical actor for any below-threshold
456        // member in this snapshot — they'll submit
457        // `SCORE_BELOW_THRESHOLD` from their own event chain. Drop our
458        // events to avoid duplicate proposals from joiners.
459        let _events = self.handle.scoring.apply_snapshot(&snapshot);
460        self.handle.config.liveness_criteria_yes = sync.liveness_criteria_yes;
461        self.handle.config.pending_update_max_epochs = sync.pending_update_max_epochs;
462        if let Some(timing) = &sync.timing {
463            self.handle.config.apply_timing(timing);
464        }
465        Ok(())
466    }
467}
468
469/// Returns `true` when the sync is acceptable for application. Logs the
470/// rejection reason on `false`.
471///
472/// `members` is the joiner's current MLS member set; ghost stewards
473/// (removed since the list was elected) are tolerated as long as at
474/// least one listed steward is still present.
475///
476/// `local_default_peer_score` is the joiner's configured starting score
477/// for new members (not synced; per-node). Rejecting when it sits at
478/// or below the synced threshold prevents a misconfiguration where every
479/// new member added by this joiner starts already eligible for removal.
480fn validate_conversation_sync(
481    conversation_name: &str,
482    sync: &ConversationSync,
483    current_epoch: u64,
484    members: &[Vec<u8>],
485    local_default_peer_score: i64,
486) -> Result<bool, UserError> {
487    if sync.election_epoch > current_epoch {
488        info!(
489            conversation = conversation_name,
490            election_epoch = sync.election_epoch,
491            current_epoch,
492            "conversation sync rejected: election_epoch > current_epoch"
493        );
494        return Ok(false);
495    }
496
497    let members_set = member_set(members);
498    let any_present = sync
499        .steward_members
500        .iter()
501        .any(|s| members_set.contains(s.as_slice()));
502    let ordering_valid = StewardList::validate(
503        &sync.steward_members,
504        sync.election_epoch,
505        conversation_name.as_bytes(),
506        &sync.steward_members,
507        &StewardListConfig::new(sync.sn_min as usize, sync.sn_max as usize)?,
508        sync.retry_round,
509    )?;
510    if !(any_present && ordering_valid) {
511        info!(
512            conversation = conversation_name,
513            any_present,
514            ordering = ordering_valid,
515            "conversation sync rejected: invalid"
516        );
517        return Ok(false);
518    }
519
520    if let Some(timing) = &sync.timing
521        && let Some(zero_field) = first_zero_timing_field(timing)
522    {
523        info!(
524            conversation = conversation_name,
525            field = zero_field,
526            "conversation sync rejected: zero-valued timing field"
527        );
528        return Ok(false);
529    }
530
531    if local_default_peer_score <= sync.threshold_peer_score {
532        info!(
533            conversation = conversation_name,
534            local_default_peer_score,
535            threshold_peer_score = sync.threshold_peer_score,
536            "conversation sync rejected: default_peer_score at or below threshold would mark new members removable on add"
537        );
538        return Ok(false);
539    }
540    Ok(true)
541}
542
543/// Name of the first zero-valued field in `timing`, or `None` if all
544/// fields are non-zero. Zero in any timing field would short-circuit the
545/// timer it drives (consensus_timeout firing immediately,
546/// commit_inactivity breaking the inactivity tracker, etc.).
547fn first_zero_timing_field(timing: &TimingConfig) -> Option<&'static str> {
548    if timing.commit_inactivity_duration_ms == 0 {
549        Some("commit_inactivity_duration_ms")
550    } else if timing.freeze_duration_ms == 0 {
551        Some("freeze_duration_ms")
552    } else if timing.proposal_expiration_ms == 0 {
553        Some("proposal_expiration_ms")
554    } else if timing.consensus_timeout_ms == 0 {
555        Some("consensus_timeout_ms")
556    } else if timing.recovery_inactivity_duration_ms == 0 {
557        Some("recovery_inactivity_duration_ms")
558    } else {
559        None
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use crate::protos::de_mls::messages::v1::TimingConfig;
567
568    fn nonzero_timing() -> TimingConfig {
569        TimingConfig {
570            commit_inactivity_duration_ms: 60_000,
571            freeze_duration_ms: 30_000,
572            proposal_expiration_ms: 3_600_000,
573            consensus_timeout_ms: 30_000,
574            recovery_inactivity_duration_ms: 5_000,
575        }
576    }
577
578    #[test]
579    fn nonzero_timing_passes() {
580        assert!(first_zero_timing_field(&nonzero_timing()).is_none());
581    }
582
583    fn valid_sync_with(threshold: i64) -> ConversationSync {
584        ConversationSync {
585            steward_members: vec![b"alice".to_vec()],
586            election_epoch: 0,
587            sn_min: 1,
588            sn_max: 5,
589            allow_subset_candidates: false,
590            peer_scores: vec![],
591            timing: Some(nonzero_timing()),
592            retry_round: 0,
593            max_reelection_attempts: 1,
594            liveness_criteria_yes: true,
595            threshold_peer_score: threshold,
596            pending_update_max_epochs: 3,
597        }
598    }
599
600    /// Joiner's `default_peer_score` strictly above the synced threshold
601    /// — new members added by this joiner start safely above the bar.
602    #[test]
603    fn validate_accepts_default_above_threshold() {
604        let sync = valid_sync_with(0);
605        assert!(validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 100).unwrap());
606    }
607
608    /// Joiner's `default_peer_score` equal to the threshold — new members
609    /// would start at threshold and `score <= threshold` already qualifies
610    /// them for removal.
611    #[test]
612    fn validate_rejects_default_equal_to_threshold() {
613        let sync = valid_sync_with(50);
614        assert!(!validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 50).unwrap());
615    }
616
617    /// Joiner's `default_peer_score` below the threshold — every new
618    /// member added by this joiner starts removable.
619    #[test]
620    fn validate_rejects_default_below_threshold() {
621        let sync = valid_sync_with(100);
622        assert!(!validate_conversation_sync("g", &sync, 0, &[b"alice".to_vec()], 50).unwrap());
623    }
624
625    #[test]
626    fn each_zero_field_is_detected() {
627        let cases = [
628            (
629                "commit_inactivity_duration_ms",
630                TimingConfig {
631                    commit_inactivity_duration_ms: 0,
632                    ..nonzero_timing()
633                },
634            ),
635            (
636                "freeze_duration_ms",
637                TimingConfig {
638                    freeze_duration_ms: 0,
639                    ..nonzero_timing()
640                },
641            ),
642            (
643                "proposal_expiration_ms",
644                TimingConfig {
645                    proposal_expiration_ms: 0,
646                    ..nonzero_timing()
647                },
648            ),
649            (
650                "consensus_timeout_ms",
651                TimingConfig {
652                    consensus_timeout_ms: 0,
653                    ..nonzero_timing()
654                },
655            ),
656            (
657                "recovery_inactivity_duration_ms",
658                TimingConfig {
659                    recovery_inactivity_duration_ms: 0,
660                    ..nonzero_timing()
661                },
662            ),
663        ];
664        for (name, timing) in cases {
665            assert_eq!(
666                first_zero_timing_field(&timing),
667                Some(name),
668                "expected field {name} to be detected as zero"
669            );
670        }
671    }
672}