rlmesh-grpc 0.1.0-rc.1

Internal RLMesh crate (unstable Rust API): gRPC clients, servers, and wire helpers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
//! Episode tracking for evaluation.

use std::collections::{HashMap, HashSet};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use uuid::Uuid;

use rlmesh_proto::env::v1::EpisodeMetadata;
use rlmesh_proto::spaces::v1::MetaMap;

/// Wall-clock nanoseconds since the Unix epoch as an `i64`.
///
/// Saturates to `i64::MAX` rather than wrapping if the count ever exceeds the
/// `i64` range (≈ year 2262), and returns 0 if the clock predates the epoch.
fn unix_nanos_now() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| i64::try_from(d.as_nanos()).unwrap_or(i64::MAX))
        .unwrap_or(0)
}

/// Single episode state (internal).
struct Episode {
    id: String,
    seed: Option<i64>,
    env_index: i32,
    step_count: i64,
    cumulative_reward: f64,
    start_time: Instant,
    start_timestamp_ns: i64,
}

impl Episode {
    fn new(env_index: i32, seed: Option<i64>) -> Self {
        let start_time = Instant::now();
        let start_timestamp_ns = unix_nanos_now();

        Self {
            id: Uuid::new_v4().to_string(),
            seed,
            env_index,
            step_count: 0,
            cumulative_reward: 0.0,
            start_time,
            start_timestamp_ns,
        }
    }

    fn record_step(&mut self, reward: f64) {
        self.step_count += 1;
        self.cumulative_reward += reward;
    }

    fn complete(
        self,
        terminated: bool,
        truncated: bool,
        final_info: Option<MetaMap>,
    ) -> EpisodeMetadata {
        let end_timestamp_ns = unix_nanos_now();

        let duration_ms = self.start_time.elapsed().as_millis() as i64;

        EpisodeMetadata {
            episode_id: self.id,
            seed: self.seed,
            env_index: self.env_index,
            step_count: self.step_count,
            cumulative_reward: self.cumulative_reward,
            terminated,
            truncated,
            start_timestamp_ns: self.start_timestamp_ns,
            end_timestamp_ns,
            duration_ms,
            final_info,
        }
    }
}

/// Maximum interrupted episodes retained between drains.
///
/// Reset-only clients can produce interrupted episodes faster than they are
/// reported. The cap bounds memory and response size while retaining recent
/// interruptions.
const MAX_INTERRUPTED_EPISODES: usize = 100;

/// The lifecycle state of a single lane, used by the step handler to enforce
/// the NEXT_STEP autoreset contract instead of inferring it from `was_active`.
///
/// A lane is in exactly one of these states at any time:
/// - [`Active`](LaneState::Active): a tracked episode is running.
/// - [`PendingAutoreset`](LaneState::PendingAutoreset): the lane completed under
///   NEXT_STEP and the env owes it exactly one fresh autoreset observation on
///   the next step (see [`EpisodeTracker::expect_autoreset`]).
/// - [`Idle`](LaneState::Idle): no active episode and none expected — awaiting an
///   explicit reset (the normal DISABLED post-completion state).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LaneState {
    /// A tracked episode is running on this lane.
    Active,
    /// The lane completed under NEXT_STEP and a fresh autoreset observation is
    /// expected on the next step.
    PendingAutoreset,
    /// No active episode and no pending autoreset.
    Idle,
}

/// Manages episode tracking for all environments.
pub struct EpisodeTracker {
    active: HashMap<i32, Episode>,
    /// Lanes that completed under NEXT_STEP and are awaiting their fresh
    /// autoreset observation on the next step. Disjoint from `active`: a lane is
    /// inserted here only after its episode completes and is cleared when the
    /// autoreset rolls the next episode (or an explicit reset starts one).
    pending_autoreset: HashSet<i32>,
    /// Episodes interrupted by a replacing reset, awaiting the next drain.
    /// Bounded by [`MAX_INTERRUPTED_EPISODES`] with oldest-first eviction.
    interrupted: Vec<EpisodeMetadata>,
    /// Count of interrupted episodes dropped because the buffer was full since
    /// the last drain, surfaced in a single warn log when the drain happens.
    interrupted_dropped: u64,
}

impl EpisodeTracker {
    /// Create a new episode tracker.
    pub fn new() -> Self {
        Self {
            active: HashMap::new(),
            pending_autoreset: HashSet::new(),
            interrupted: Vec::new(),
            interrupted_dropped: 0,
        }
    }

    /// Buffer an interrupted episode, evicting the oldest entry (and counting
    /// the drop) when the buffer is already at [`MAX_INTERRUPTED_EPISODES`]. A
    /// client that loops `Reset` without `Step` cannot grow this buffer without
    /// bound; the dropped count is surfaced when the buffer is next drained.
    fn push_interrupted(&mut self, metadata: EpisodeMetadata) {
        if self.interrupted.len() >= MAX_INTERRUPTED_EPISODES {
            // Evict oldest-first so the most recent interruptions are retained.
            self.interrupted.remove(0);
            self.interrupted_dropped = self.interrupted_dropped.saturating_add(1);
        }
        self.interrupted.push(metadata);
    }

    /// Take the buffered interrupted episodes, logging any that were dropped
    /// because the buffer overflowed since the last drain.
    fn take_interrupted(&mut self) -> Vec<EpisodeMetadata> {
        if self.interrupted_dropped > 0 {
            tracing::warn!(
                dropped = self.interrupted_dropped,
                cap = MAX_INTERRUPTED_EPISODES,
                "interrupted-episode buffer overflowed; oldest interrupted episodes were \
                 dropped before they could be delivered (a client looping Reset without Step \
                 grows this buffer); their accounting is lost"
            );
            self.interrupted_dropped = 0;
        }
        std::mem::take(&mut self.interrupted)
    }

    /// Start a new episode for the given environment index.
    ///
    /// `seed` is `None` when the environment was reset without an explicit seed
    /// (it seeded itself from entropy), so the episode metadata honestly records
    /// the absence of a seed instead of fabricating one.
    ///
    /// Returns the episode ID.
    pub fn start_episode(&mut self, env_index: i32, seed: Option<i64>) -> String {
        let episode = Episode::new(env_index, seed);
        let episode_id = episode.id.clone();

        // Starting an episode (autoreset roll or explicit reset) discharges any
        // pending-autoreset expectation for this lane.
        self.pending_autoreset.remove(&env_index);

        // A reset can legitimately interrupt an in-flight episode (manual
        // vector reset, runtime reset racing a lane autoreset). Complete the
        // replaced episode as truncated and buffer it so the next drain point
        // (step completed_episodes / close final_episodes) surfaces it instead
        // of silently dropping its accounting.
        if let Some(old_episode) = self.active.insert(env_index, episode) {
            tracing::debug!(
                "Episode {} for env {} interrupted by a new episode; completing as truncated",
                old_episode.id,
                env_index
            );
            self.push_interrupted(old_episode.complete(false, true, None));
        }

        episode_id
    }

    /// Drain episodes that were interrupted by a replacing reset since the
    /// last drain. Callers fold these into the completed-episode stream so
    /// interrupted episodes surface exactly once.
    pub fn drain_interrupted(&mut self) -> Vec<EpisodeMetadata> {
        self.take_interrupted()
    }

    /// Record a step for the given environment.
    pub fn record_step(&mut self, env_index: i32, reward: f64) {
        if let Some(episode) = self.active.get_mut(&env_index) {
            episode.record_step(reward);
        } else {
            tracing::warn!(
                "Attempted to record step for env {} with no active episode",
                env_index
            );
        }
    }

    /// Complete an episode and return its metadata.
    ///
    /// Returns None if no episode was active for the given environment.
    pub fn complete_episode(
        &mut self,
        env_index: i32,
        terminated: bool,
        truncated: bool,
        final_info: Option<MetaMap>,
    ) -> Option<EpisodeMetadata> {
        let episode = self.active.remove(&env_index)?;
        Some(episode.complete(terminated, truncated, final_info))
    }

    /// Complete all active episodes (e.g., on cancellation).
    ///
    /// Returns metadata for all episodes that were active.
    pub fn complete_all(&mut self, reason: &str) -> Vec<EpisodeMetadata> {
        tracing::info!(
            "Completing all {} active episodes: {}",
            self.active.len(),
            reason
        );

        let mut completed = self.take_interrupted();
        for (_env_index, episode) in self.active.drain() {
            let metadata = episode.complete(false, true, None);
            completed.push(metadata);
        }
        // Pending lanes have no active episode left to complete; clear the
        // expectations so a fresh session starts clean.
        self.pending_autoreset.clear();

        completed
    }

    /// Get the active episode ID for a specific environment index.
    pub fn active_episode_id(&self, env_index: i32) -> Option<&str> {
        self.active
            .get(&env_index)
            .map(|episode| episode.id.as_str())
    }

    /// The lifecycle state of a lane: [`Active`](LaneState::Active) if a tracked
    /// episode is running, [`PendingAutoreset`](LaneState::PendingAutoreset) if
    /// it completed under NEXT_STEP and owes a fresh autoreset observation, else
    /// [`Idle`](LaneState::Idle). `active` takes precedence so a lane is never
    /// reported pending while an episode is running.
    pub fn lane_state(&self, env_index: i32) -> LaneState {
        if self.active.contains_key(&env_index) {
            LaneState::Active
        } else if self.pending_autoreset.contains(&env_index) {
            LaneState::PendingAutoreset
        } else {
            LaneState::Idle
        }
    }

    /// Mark a lane as expecting a fresh autoreset observation on the next step.
    ///
    /// Call this immediately after [`complete_episode`](Self::complete_episode)
    /// for a NEXT_STEP env: the lane is now inactive, and its next step must be
    /// the env's fresh (non-terminal, reward-0) autoreset observation, at which
    /// point [`start_episode`](Self::start_episode) rolls the next episode and
    /// clears the expectation.
    pub fn expect_autoreset(&mut self, env_index: i32) {
        self.pending_autoreset.insert(env_index);
    }
}

impl Default for EpisodeTracker {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn active_count(tracker: &EpisodeTracker) -> usize {
        tracker.active.len()
    }

    #[test]
    fn test_episode_lifecycle() {
        let mut tracker = EpisodeTracker::new();

        // Start episode
        let ep_id = tracker.start_episode(0, Some(42));
        assert_eq!(active_count(&tracker), 1);

        // Record steps
        tracker.record_step(0, 1.0);
        tracker.record_step(0, 2.5);

        // Complete episode
        let metadata = tracker.complete_episode(0, true, false, None).unwrap();
        assert_eq!(metadata.episode_id, ep_id);
        assert_eq!(metadata.seed, Some(42));
        assert_eq!(metadata.env_index, 0);
        assert_eq!(metadata.step_count, 2);
        assert_eq!(metadata.cumulative_reward, 3.5);
        assert!(metadata.terminated);
        assert!(!metadata.truncated);

        assert_eq!(active_count(&tracker), 0);
    }

    #[test]
    fn lane_state_tracks_active_pending_and_idle() {
        let mut tracker = EpisodeTracker::new();

        // An unknown lane starts Idle.
        assert_eq!(tracker.lane_state(0), LaneState::Idle);

        // Starting an episode makes it Active.
        tracker.start_episode(0, None);
        assert_eq!(tracker.lane_state(0), LaneState::Active);

        // Completing it returns to Idle (no autoreset expected by default —
        // this is the DISABLED post-completion state).
        tracker.complete_episode(0, true, false, None);
        assert_eq!(tracker.lane_state(0), LaneState::Idle);

        // Under NEXT_STEP the handler marks the lane pending after completion.
        tracker.start_episode(0, None);
        tracker.complete_episode(0, true, false, None);
        tracker.expect_autoreset(0);
        assert_eq!(tracker.lane_state(0), LaneState::PendingAutoreset);

        // The fresh autoreset roll discharges the expectation and re-activates.
        tracker.start_episode(0, None);
        assert_eq!(tracker.lane_state(0), LaneState::Active);
    }

    #[test]
    fn active_episode_takes_precedence_over_pending_autoreset() {
        let mut tracker = EpisodeTracker::new();
        tracker.start_episode(0, None);
        // Defensive: even if a stale expectation is set, an active episode wins.
        tracker.expect_autoreset(0);
        assert_eq!(tracker.lane_state(0), LaneState::Active);
    }

    #[test]
    fn complete_all_clears_pending_autoreset() {
        let mut tracker = EpisodeTracker::new();
        tracker.start_episode(0, None);
        tracker.complete_episode(0, true, false, None);
        tracker.expect_autoreset(0);
        assert_eq!(tracker.lane_state(0), LaneState::PendingAutoreset);

        let _ = tracker.complete_all("close");
        assert_eq!(tracker.lane_state(0), LaneState::Idle);
    }

    #[test]
    fn interrupted_episode_is_completed_as_truncated_and_drained_once() {
        let mut tracker = EpisodeTracker::new();

        let first = tracker.start_episode(0, Some(7));
        tracker.record_step(0, 1.5);
        tracker.record_step(0, 2.5);

        // A replacing reset interrupts the in-flight episode: its accounting
        // must surface as a truncated completion instead of being dropped.
        let second = tracker.start_episode(0, None);
        assert_ne!(first, second);
        assert_eq!(active_count(&tracker), 1);

        let interrupted = tracker.drain_interrupted();
        assert_eq!(interrupted.len(), 1);
        assert_eq!(interrupted[0].episode_id, first);
        assert_eq!(interrupted[0].step_count, 2);
        assert_eq!(interrupted[0].cumulative_reward, 4.0);
        assert!(!interrupted[0].terminated);
        assert!(interrupted[0].truncated);

        // Exactly once: a second drain is empty.
        assert!(tracker.drain_interrupted().is_empty());
    }

    #[test]
    fn complete_all_includes_undrained_interrupted_episodes() {
        let mut tracker = EpisodeTracker::new();

        let first = tracker.start_episode(0, Some(1));
        tracker.record_step(0, 1.0);
        let second = tracker.start_episode(0, None);

        let mut all = tracker.complete_all("client close");
        all.sort_by(|a, b| a.episode_id.cmp(&b.episode_id));
        let mut expected = vec![first, second];
        expected.sort();
        let mut got: Vec<String> = all.iter().map(|m| m.episode_id.clone()).collect();
        got.sort();
        assert_eq!(got, expected);
        assert!(tracker.drain_interrupted().is_empty());
    }

    #[test]
    fn interrupted_buffer_is_bounded_under_repeated_reset_without_step() {
        let mut tracker = EpisodeTracker::new();

        let resets = MAX_INTERRUPTED_EPISODES * 5;
        for _ in 0..resets {
            tracker.start_episode(0, None);
            // The buffer must never grow past the cap, regardless of reset count.
            assert!(
                tracker.interrupted.len() <= MAX_INTERRUPTED_EPISODES,
                "interrupted buffer exceeded its cap"
            );
        }

        // It sits exactly at the cap (the very first reset created the active
        // episode without interrupting anything; every subsequent reset added an
        // interrupted entry, then eviction held it at the cap).
        assert_eq!(tracker.interrupted.len(), MAX_INTERRUPTED_EPISODES);
        // The overflow was counted so the next drain can warn.
        assert!(tracker.interrupted_dropped > 0);

        // Draining empties the buffer and clears the dropped counter (the warn
        // log fires here), and yields at most the cap many episodes — never the
        // whole unbounded backlog at once.
        let drained = tracker.drain_interrupted();
        assert_eq!(drained.len(), MAX_INTERRUPTED_EPISODES);
        assert_eq!(tracker.interrupted_dropped, 0);
        assert!(tracker.drain_interrupted().is_empty());
    }

    #[test]
    fn test_vectorized_episodes() {
        let mut tracker = EpisodeTracker::new();

        // Start multiple episodes
        let ep0 = tracker.start_episode(0, Some(100));
        let ep1 = tracker.start_episode(1, Some(200));
        let _ep2 = tracker.start_episode(2, Some(300));
        assert_eq!(active_count(&tracker), 3);

        // Record steps for each
        tracker.record_step(0, 1.0);
        tracker.record_step(1, 2.0);
        tracker.record_step(2, 3.0);

        // Complete env 1
        let meta1 = tracker.complete_episode(1, true, false, None).unwrap();
        assert_eq!(meta1.episode_id, ep1);
        assert_eq!(active_count(&tracker), 2);

        // Complete env 0
        let meta0 = tracker.complete_episode(0, false, true, None).unwrap();
        assert_eq!(meta0.episode_id, ep0);
        assert!(!meta0.terminated);
        assert!(meta0.truncated);
        assert_eq!(active_count(&tracker), 1);

        // Complete env 2
        tracker.complete_episode(2, true, false, None);
        assert_eq!(active_count(&tracker), 0);
    }

    #[test]
    fn test_complete_all() {
        let mut tracker = EpisodeTracker::new();

        tracker.start_episode(0, Some(1));
        tracker.start_episode(1, Some(2));
        tracker.start_episode(2, Some(3));

        tracker.record_step(0, 1.0);
        tracker.record_step(1, 2.0);

        let interrupted = tracker.complete_all("test cancellation");
        assert_eq!(interrupted.len(), 3);
        assert_eq!(active_count(&tracker), 0);

        // All should be truncated (not terminated)
        for meta in interrupted {
            assert!(!meta.terminated);
            assert!(meta.truncated);
        }
    }

    #[test]
    fn unseeded_episode_leaves_seed_unset_not_fabricated_zero() {
        let mut tracker = EpisodeTracker::new();

        // Reset without an explicit seed (the env seeded itself from entropy).
        tracker.start_episode(0, None);
        let meta = tracker.complete_episode(0, true, false, None).unwrap();

        // The seed must be absent, never a fabricated 0 that downstream
        // consumers would mistake for a real seed.
        assert_eq!(meta.seed, None);

        // An explicit seed of 0 is still recorded faithfully as Some(0).
        tracker.start_episode(1, Some(0));
        let meta = tracker.complete_episode(1, true, false, None).unwrap();
        assert_eq!(meta.seed, Some(0));
    }
}