1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
//! # Topics (pub/sub-style)
//!
//! Topics are **not** a separate network bus: they are still **serialized through the actor
//! mailbox**, just like ordinary messages. That keeps topic state and handler state consistent,
//! but it also means ordering and latency follow the same queue rules.
//!
//! ## Building a topic
//!
//! 1. Pick a marker type and implement [`crate::Topic`] with `type Item = …` for the payload
//! broadcast to subscribers. The `Item` type must implement [`Clone`] because each waiting subscriber gets
//! its own copy on [`crate::TopicEndpoint::publish`].
//! 2. Store a [`crate::TopicEndpoint`] field on your [`crate::Service`] (often `Default`-initialized).
//! 3. Implement [`crate::RoutedTopic`] for your marker type so `endpoint(service)` returns
//! `&mut` that field. Keep routing **stable**: the same topic type must always resolve to the
//! same field for a given service implementation.
//!
//! ```rust,no_run
//! use async_trait::async_trait;
//! use serviceless::{
//! Context, EmptyStream, Handler, Message, RoutedTopic, Service, Topic, TopicEndpoint,
//! };
//!
//! #[derive(Clone)]
//! pub struct UserReady(pub String);
//!
//! pub struct UserReadyTopic;
//!
//! impl Topic for UserReadyTopic {
//! type Item = UserReady;
//! }
//!
//! #[derive(Default)]
//! pub struct MyService {
//! pub user_ready: TopicEndpoint<UserReadyTopic>,
//! }
//!
//! #[async_trait]
//! impl Service for MyService {
//! type Stream = EmptyStream<Self>;
//! }
//!
//! impl RoutedTopic<MyService> for UserReadyTopic {
//! fn endpoint(service: &mut MyService) -> &mut TopicEndpoint<Self> {
//! &mut service.user_ready
//! }
//! }
//!
//! pub struct DoWork;
//!
//! impl Message for DoWork {
//! type Result = ();
//! }
//!
//! #[async_trait]
//! impl Handler<DoWork> for MyService {
//! async fn handle(&mut self, _message: DoWork, ctx: &mut Context<Self, Self::Stream>) {
//! let _ = ctx.publish::<UserReadyTopic>(UserReady("done".into()));
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let service = MyService::default();
//! let (addr, run) = service.start_by_context(Context::new());
//! tokio::spawn(run);
//!
//! // Same enqueue/await pattern as `examples/topic.rs` (ordering vs. publish is discussed above).
//! let ready = addr.subscribe::<UserReadyTopic>();
//! addr.send(DoWork).expect("send work");
//! let event = ready.await.expect("event");
//! assert_eq!(event.0, "done");
//! }
//! ```
//!
//! ## Publishing
//!
//! - **Inside the actor** (common): [`crate::Context::publish`] enqueues a publish envelope. When
//! it is processed, [`crate::TopicEndpoint::publish`] runs and wakes every waiter registered at
//! that moment, then **clears** the waiter list.
//! - Publishing is therefore a **one-shot fan-out**: each `subscribe` waits for **one** future
//! publish; after a publish, subscribers must `subscribe` again to receive another item.
//!
//! ## Subscribing
//!
//! - **From outside:** [`crate::ServiceAddress::subscribe`] sends a subscribe envelope; when it
//! runs, it registers the caller’s one-shot channel on the endpoint. Await the future to obtain
//! the next published `Item` (or [`crate::Error::ServiceStoped`] if the actor stops).
//!
//! ## Caveats and patterns
//!
//! - **Ordering:** `subscribe` and `publish` both pass through the mailbox. If you `send` a
//! message that publishes and **also** `subscribe` from another task without ordering guarantees,
//! you can race: the publish envelope might be processed before the subscribe envelope. When
//! you need a strict “subscribe then publish” handshake, **sequence** those operations (e.g.
//! await `subscribe` first, or perform both from the same actor/handler).
//! - **Missed events:** there is **no** buffer of past publications for late subscribers—only
//! waiters registered at publish time receive the item.
//! - **Cleanup:** [`crate::TopicEndpoint::clear`] drops pending waiters (their receivers observe a
//! closed channel). Dropped receivers do not block publish; they are skipped.
//! - **Stopping:** when the service stops, pending topic operations fail like other mailbox
//! operations.
//!
//! The repository’s `examples/topic.rs` shows the same wiring with assertions in a small demo.