# Topic Internal Design
## Table of Contents
1. [Introduction](#introduction)
2. [Architecture Overview](#architecture-overview)
3. [Data Structures](#data-structures)
4. [Component Interactions](#component-interactions)
5. [Publish/Subscribe Flow](#publishsubscribe-flow)
6. [Memory Management](#memory-management)
7. [Thread State Management](#thread-state-management)
8. [Synchronization and Concurrency](#synchronization-and-concurrency)
9. [Error Handling](#error-handling)
10. [Kernel Call Handlers](#kernel-call-handlers)
11. [User-Space API](#user-space-api)
## Introduction
The Topic subsystem provides an **asynchronous publish/subscribe** communication mechanism between processes. It implements a one-to-many messaging pattern where publishers send messages to named topics, and subscribers receive messages from those topics. The asynchronous nature allows publishers to continue execution immediately after publishing, while subscribers block until a message is available.
### Design Goals
- **Asynchronous Communication**: Publishers don't block waiting for subscribers
- **Named Topics**: Topics are identified by string names
- **Publish/Subscribe Pattern**: One-to-many message distribution
- **Memory Efficiency**: Single-copy message passing using shared memory
- **Thread Blocking**: Automatic thread blocking/unblocking for synchronization
- **Process Isolation**: Messages are passed between isolated process address spaces
### Document Scope
This document describes the internal design and implementation of the Topic subsystem. It covers both the user-space API (`library/tail_core/src/topic/`) and the kernel-side implementation (`kernel/src/ipc/topic/`). It details data structures, algorithms, memory management, and component interactions.
## Architecture Overview
### Component Structure
**User-Space (tail_core)**:
```
library/tail_core/src/topic/
├── mod.rs # Module exports
├── publisher.rs # Publisher API
└── subscriber.rs # Subscriber API
```
**Kernel-Space**:
```
kernel/src/ipc/topic/
├── mod.rs # Module exports
├── topic_directory.rs # Topic registry and directory
└── topic_kernel_layout.rs # Topic data structure and shared memory
```
**Kernel Call Handlers**:
```
kernel/src/kernel_call/
├── kernel_call_create_topic.rs # Topic creation handler
├── kernel_call_publish_to_topic.rs # Message publishing handler
└── kernel_call_subscribe_topic.rs # Message subscription handler
```
### System Integration
The Topic subsystem integrates with:
- **Process Manager**: For process identification and memory management
- **Thread Manager**: For thread state management and blocking/unblocking
- **Scheduler**: For thread scheduling and priority management
- **Memory Manager**: For memory mapping and address translation
- **Safe Shared Memory**: For cross-process memory sharing
- **Kernel Call Interface**: For user-space API
### High-Level Architecture
```
┌─────────────────────────────────────────────────────────────┐
│ User Space │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Publisher │ │ Subscriber │ │
│ │ Process │ │ Process │ │
│ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │
│ │ publish() │ subscribe() │
│ │ │ │
└─────────┼──────────────────────────────┼───────────────────┘
│ │
└──────────┬───────────────────┘
│
┌──────────▼───────────────────┐
│ Kernel Call Handlers │
│ - create_topic │
│ - publish_to_topic │
│ - subscribe_topic │
└──────────┬───────────────────┘
│
┌──────────▼───────────────────┐
│ Topic Directory │
│ - Topic registry │
│ - Topic lookup │
└──────────┬───────────────────┘
│
┌──────────▼───────────────────┐
│ Topic Kernel Layout │
│ - Shared memory │
│ - Publisher/Subscriber lists │
│ - Data availability flag │
└──────────────────────────────┘
```
## Data Structures
### TopicKernelLayout
**Location**: `kernel/src/ipc/topic/topic_kernel_layout.rs`
Represents a topic instance in the kernel.
```rust
pub struct TopicKernelLayout {
name: [u8; TOPIC_NAME_MAX_LENGTH], // Topic name (max 255 bytes)
shared_data: SafeSharedMemory, // Shared memory for message data
is_new_data_available: bool, // Flag indicating new message
new_data_size: usize, // Size of current message
publishers: Vec<Arc<RefCell<Thread>>>, // List of publisher threads
subscribers: Vec<Arc<RefCell<Thread>>>, // List of subscriber threads
}
```
**Key Design Decisions**:
- Fixed-size topic name array (255 bytes) for efficient storage
- Single shared memory buffer for message passing (one message at a time)
- `is_new_data_available` flag prevents overwriting unread messages
- Publisher and subscriber lists for automatic thread wake-up
- Thread references stored for blocking/unblocking operations
**Operations**:
- `new()`: Create a new topic with shared memory allocation
- `write()`: Write a message to the topic (blocks if data not consumed)
- `read()`: Read a message from the topic (blocks if no data available)
- `is_thread_publisher()`: Check if thread is registered as publisher
- `is_thread_subscriber()`: Check if thread is registered as subscriber
### SafeSharedMemory
**Location**: `kernel/src/ipc/safe_shared_memory.rs`
Manages shared memory across processes for topic message passing.
```rust
pub struct SafeSharedMemory {
physical_address: *const u8, // Physical memory address
pids: Vec<u64>, // Process IDs using this memory
virtual_addresses: Vec<u64>, // Virtual addresses per process
size: usize, // Size of shared memory
}
```
**Key Design Decisions**:
- Single physical memory page shared across processes
- Per-process virtual address mapping
- Tracks all processes using the shared memory
- Enables efficient message passing with a single copy operation
**Operations**:
- `new()`: Allocate and initialize shared memory
- `write()`: Write data from a process to shared memory
- `read_virtual_address()`: Get virtual address for a process to read
- `map_shared_memory_to_process()`: Map shared memory to a new process
### TopicDirectoryLayout
**Location**: `kernel/src/ipc/topic/topic_directory.rs`
Global topic registry that maintains all registered topics.
```rust
pub struct TopicDirectoryLayout {
pub directory: Vec<TopicKernelLayout>, // List of registered topics
}
```
**Global Instance**:
```rust
pub static SPIN_MUTEXED_TOPIC_DIRECTORY: SpinMutex<TopicDirectoryLayout>
```
**Key Design Decisions**:
- Vector-based storage for dynamic topic registration
- Protected by `SpinMutex` for thread-safe access
- Linear search for topic lookup (O(n) complexity)
- No topic removal mechanism (topics persist until process termination)
**Operations**:
- `add_new_topic()`: Register a new topic
- `search_topic()`: Find topic index by name
- `get_topic()`: Get immutable topic reference
- `write_message_to_topic()`: Write message to a topic by name
- `read_message_from_topic()`: Read message from a topic by name
### Publisher (User-Space)
**Location**: `library/tail_core/src/topic/publisher.rs`
User-space API for publishing messages to topics.
```rust
pub struct Publisher<T: ?Sized> {
topic_name: [u8; TOPIC_NAME_MAX_LENGTH],
topic_name_len: usize,
phantom: core::marker::PhantomData<T>,
}
```
**Key Design Decisions**:
- Generic type parameter for type-safe message publishing
- Stores topic name as fixed-size array for efficiency
- PhantomData ensures type safety without runtime overhead
- Two publish methods: one for sized types, one for unsized types
**Operations**:
- `new()`: Create a new publisher (creates topic if needed)
- `publish()`: Publish a message (requires explicit size)
- `publish_sized()`: Publish a message (auto-calculates size for sized types)
- `topic_name()`: Get the topic name
### Subscriber (User-Space)
**Location**: `library/tail_core/src/topic/subscriber.rs`
User-space API for subscribing to topics and receiving messages.
```rust
pub struct Subscriber<T: ?Sized> {
topic_name: [u8; TOPIC_NAME_MAX_LENGTH],
phantom: core::marker::PhantomData<T>,
}
```
**Key Design Decisions**:
- Generic type parameter for type-safe message receiving
- Stores topic name as fixed-size array
- Returns raw pointer and size (caller responsible for type casting)
**Operations**:
- `new()`: Create a new subscriber
- `subscribe()`: Subscribe to topic and receive message (blocks if no data)
## Component Interactions
### Topic Creation Flow
```
Publisher Process
│
├──> Publisher::new(topic_name)
│ ├──> Validate topic name length
│ ├──> syscall_create_topic(topic_name)
│ │ ├──> kernel_call_create_topic()
│ │ │ ├──> Lock TopicDirectory
│ │ │ ├──> Check if topic exists
│ │ │ ├──> Create SafeSharedMemory
│ │ │ │ ├──> Allocate memory in creator process
│ │ │ │ └──> Get physical address
│ │ │ ├──> Create TopicKernelLayout
│ │ │ │ ├──> Store topic name
│ │ │ │ ├──> Store shared memory
│ │ │ │ ├──> Initialize empty publisher/subscriber lists
│ │ │ │ └──> Set is_new_data_available = false
│ │ │ ├──> Add to TopicDirectory
│ │ │ └──> Unlock TopicDirectory
│ │ └──> Return to user space
│ └──> Publisher ready to publish
│
└──> Topic is now registered and ready for use
```
### Publish Flow
```
Publisher Process
│
├──> Publisher::publish(message, size)
│ ├──> syscall_publish_to_topic(topic_name, message, size)
│ │ ├──> kernel_call_publish_to_topic()
│ │ │ ├──> Lock TopicDirectory
│ │ │ ├──> Find topic by name
│ │ │ ├──> topic.write(writer_thread, message, size)
│ │ │ │ ├──> Check is_new_data_available
│ │ │ │ │ ├──> If true: return error (block publisher)
│ │ │ │ │ └──> If false: continue
│ │ │ │ ├──> Register publisher thread (if not registered)
│ │ │ │ ├──> Set is_new_data_available = true
│ │ │ │ ├──> Set new_data_size = message_size
│ │ │ │ └──> shared_data.write(writer_pid, message, size)
│ │ │ │ ├──> Map shared memory to writer process (if needed)
│ │ │ │ ├──> Clear shared memory
│ │ │ │ └──> Copy message to shared memory
│ │ │ ├──> If write succeeded:
│ │ │ │ ├──> Get all subscribers
│ │ │ │ └──> Unblock all blocked subscribers
│ │ │ └──> If write failed (data not consumed):
│ │ │ └──> Block publisher thread
│ │ └──> Return to user space
│ └──> Publisher continues (or blocked if data not consumed)
│
└──> Message published (or publisher blocked)
```
### Subscribe Flow
```
Subscriber Process
│
├──> Subscriber::subscribe()
│ ├──> syscall_subscribe_topic(topic_name)
│ │ ├──> kernel_call_subscribe_topic()
│ │ │ ├──> Lock TopicDirectory
│ │ │ ├──> Find topic by name
│ │ │ ├──> topic.read(reader_thread)
│ │ │ │ ├──> Register subscriber thread (if not registered)
│ │ │ │ ├──> Check is_new_data_available
│ │ │ │ │ ├──> If false: return error (block subscriber)
│ │ │ │ │ └──> If true: continue
│ │ │ │ ├──> Set is_new_data_available = false
│ │ │ │ ├──> Map shared memory to reader process (if needed)
│ │ │ │ └──> Return virtual address and size
│ │ │ ├──> If read succeeded:
│ │ │ │ └──> Return (message_ptr, message_size)
│ │ │ └──> If read failed (no data available):
│ │ │ ├──> Block subscriber thread
│ │ │ ├──> Get all publishers
│ │ │ └──> Unblock all blocked publishers
│ │ │ └──> Return (null_ptr, 0)
│ │ └──> Return to user space
│ └──> Subscriber receives message (or blocked if no data)
│
└──> Message received (or subscriber blocked)
```
## Publish/Subscribe Flow
### Normal Operation (Data Available)
```
Time Publisher Topic Subscriber
│ │ │ │
│ │ publish(message) │ │
│ ├───────────────────────────>│ │
│ │ │ is_new_data = false │
│ │ │ Write message │
│ │ │ is_new_data = true │
│ │ │ │
│ │ │ │ subscribe()
│ │ │<───────────────────────┤
│ │ │ is_new_data = true │
│ │ │ Read message │
│ │ │ is_new_data = false │
│ │ ├───────────────────────>│
│ │ │ │ (message_ptr, size)
│ │ │ │
```
### Publisher Blocking (Data Not Consumed)
```
Time Publisher Topic Subscriber
│ │ │ │
│ │ publish(message1) │ │
│ ├───────────────────────────>│ │
│ │ │ is_new_data = true │
│ │ │ │
│ │ publish(message2) │ │
│ ├───────────────────────────>│ │
│ │ │ is_new_data = true │
│ │ │ Write fails │
│ │ ├───────────────────────>│
│ │ │ │ [BLOCKED]
│ │ [BLOCKED] │ │
│ │ │ │
│ │ │ │ subscribe()
│ │ │<───────────────────────┤
│ │ │ Read message1 │
│ │ │ is_new_data = false │
│ │ ├───────────────────────>│
│ │ │ │ (message1_ptr, size)
│ │ │ │
│ │ [UNBLOCKED] │ │
│ │ │ Write message2 │
│ │ │ is_new_data = true │
│ │ │ │
```
### Subscriber Blocking (No Data Available)
```
Time Publisher Topic Subscriber
│ │ │ │
│ │ │ │ subscribe()
│ │ │<───────────────────────┤
│ │ │ is_new_data = false │
│ │ │ Read fails │
│ │ ├───────────────────────>│
│ │ │ │ [BLOCKED]
│ │ │ │
│ │ publish(message) │ │
│ ├───────────────────────────>│ │
│ │ │ Write message │
│ │ │ is_new_data = true │
│ │ │ │
│ │ │ │ [UNBLOCKED]
│ │ ├───────────────────────>│
│ │ │ │ (message_ptr, size)
│ │ │ │
```
## Memory Management
### Shared Memory Allocation
1. **Topic Creation**:
- Shared memory is allocated in the creator process's address space
- Physical address is obtained and stored in `SafeSharedMemory`
- Virtual address in creator process is stored in `virtual_addresses[0]`
2. **Memory Mapping**:
- When a new process (publisher or subscriber) accesses the topic:
- Check if process is already registered in `pids` vector
- If not registered, map shared memory to the process
- Add process PID and virtual address to vectors
- Mapping uses `mmap` with physical address translation
3. **Memory Layout**:
```
Publisher Process Shared Memory (Physical)
┌─────────────────┐ ┌─────────────────────────────┐
│ Message Buffer │──memcpy──> │ Message Data (up to size) │
│ (user space) │ │ (shared via single copy) │
└─────────────────┘ └─────────────────────────────┘
│
│ Maps to
Subscriber Process │
┌─────────────────┐ │
│ Virtual Addr B │<───────────────────────────────┘
└─────────────────┘
```
**Copy Operation**:
- Publisher writes message from its buffer to shared memory (one `memcpy` in kernel)
- Subscriber reads directly from shared memory (no additional copy)
- Total: One copy operation per message (from publisher buffer to shared memory)
### Memory Safety
- **Single Copy**: Messages are copied once from publisher buffer to shared memory
- **Process Isolation**: Each process has its own virtual address mapping
- **Size Validation**: Message size is validated against shared memory size
- **Memory Clearing**: Shared memory is cleared before writing new messages
- **Single Message**: Only one message can be stored at a time (prevents overwrites)
## Thread State Management
### Thread States
**Publisher States**:
- `ThreadStateReady`: Publisher can publish
- `ThreadStateTopicPublisherBlocked`: Publisher blocked (data not consumed)
**Subscriber States**:
- `ThreadStateReady`: Subscriber can subscribe
- `ThreadStateTopicSubscriberBlocked`: Subscriber blocked (no data available)
### Blocking/Unblocking Logic
**Publisher Blocking**:
- Occurs when `is_new_data_available == true` during write
- Thread state set to `ThreadStateTopicPublisherBlocked`
- Thread moved to block queue
- Unblocked when subscriber reads the message
**Subscriber Blocking**:
- Occurs when `is_new_data_available == false` during read
- Thread state set to `ThreadStateTopicSubscriberBlocked`
- Thread moved to block queue
- Unblocked when publisher writes a new message
**Wake-up Logic**:
- After successful write: Unblock all blocked subscribers
- After successful read: Unblock all blocked publishers
- Uses `Scheduler::unblock()` to move threads to ready queue
## Synchronization and Concurrency
### SpinMutex Protection
The `TopicDirectory` is protected by a `SpinMutex`:
```rust
pub static SPIN_MUTEXED_TOPIC_DIRECTORY: SpinMutex<TopicDirectoryLayout>
```
**Critical Sections**:
- Topic creation
- Topic lookup
- Message publishing
- Message subscription
**Lock Ordering**:
- Always lock `TopicDirectory` before accessing topics
- Lock is held for the entire operation (lookup + read/write)
- Prevents race conditions in multi-threaded scenarios
### Concurrency Guarantees
- **Atomic Operations**: `is_new_data_available` flag prevents race conditions
- **Single Writer**: Only one message can be written at a time
- **Multiple Readers**: Multiple subscribers can be registered (but only one reads at a time)
- **Thread Safety**: All operations are protected by SpinMutex
## Error Handling
### Error Conditions
1. **Topic Not Found**:
- Error: "Topic not found"
- Occurs when publishing/subscribing to non-existent topic
- Publisher should create topic first
2. **Data Not Consumed**:
- Error: "TopicKernelLayout::write() failed. There is data not read yet"
- Occurs when publisher tries to write while previous message not read
- Publisher thread is blocked
3. **No Data Available**:
- Error: "TopicKernelLayout::read() failed. There is no new data available"
- Occurs when subscriber tries to read but no message available
- Subscriber thread is blocked
4. **Memory Allocation Failure**:
- Error: Returned from `SafeSharedMemory::new()`
- Occurs during topic creation
- Topic creation fails
### Error Propagation
- Kernel call handlers return errors via return values
- User-space API may panic on invalid input (e.g., topic name too long)
- Thread blocking is used for synchronization errors (not fatal errors)
## Kernel Call Handlers
### kernel_call_create_topic
**Location**: `kernel/src/kernel_call/kernel_call_create_topic.rs`
**Function Signature**:
```rust
pub fn kernel_call_create_topic(
caller_thread: Arc<RefCell<Thread>>,
topic_name_ptr: *const u8,
topic_name_length: usize
)
```
**Operations**:
1. Convert topic name from byte slice to string
2. Get caller process
3. Lock TopicDirectory
4. Create SafeSharedMemory (allocates in caller process)
5. Create TopicKernelLayout
6. Add to TopicDirectory
7. Unlock TopicDirectory
**Kernel Call Number**: `KERNEL_CALL_CREATE_TOPIC = 2`
### kernel_call_publish_to_topic
**Location**: `kernel/src/kernel_call/kernel_call_publish_to_topic.rs`
**Function Signature**:
```rust
pub fn kernel_call_publish_to_topic(
caller_thread: Arc<RefCell<Thread>>,
topic_name_ptr: *const u8,
topic_name_length: usize,
message: *const u8,
message_size: usize
)
```
**Operations**:
1. Convert topic name from byte slice to string
2. Lock TopicDirectory
3. Find topic by name
4. Write message to topic
5. If write succeeds: Unblock all blocked subscribers
6. If write fails: Block publisher thread
7. Unlock TopicDirectory
**Kernel Call Number**: `KERNEL_CALL_PUBLISH_TOPIC = 1`
### kernel_call_subscribe_topic
**Location**: `kernel/src/kernel_call/kernel_call_subscribe_topic.rs`
**Function Signature**:
```rust
pub fn kernel_call_subscribe_topic(
receiver_thread: Arc<RefCell<Thread>>,
topic_name_ptr: *const u8,
topic_name_length: usize
) -> (*const u8, usize)
```
**Operations**:
1. Convert topic name from byte slice to string
2. Lock TopicDirectory
3. Find topic by name
4. Read message from topic
5. If read succeeds: Return (message_ptr, message_size)
6. If read fails: Block subscriber thread, unblock blocked publishers, return (null, 0)
7. Unlock TopicDirectory
**Kernel Call Number**: `KERNEL_CALL_SUBSCRIBE_TOPIC = 3`
## User-Space API
### Publisher API
**Creating a Publisher**:
```rust
let publisher = Publisher::<MyMessage>::new("my_topic");
```
**Publishing a Message**:
```rust
// For sized types
publisher.publish_sized(&message);
// For unsized types
publisher.publish(&message, message_size);
```
### Subscriber API
**Creating a Subscriber**:
```rust
let subscriber = Subscriber::<MyMessage>::new("my_topic");
```
**Subscribing to Messages**:
```rust
let (message_ptr, message_size) = subscriber.subscribe();
// Cast to appropriate type
let message = unsafe { &*message_ptr.cast::<MyMessage>() };
```
### Syscall Interface
**syscall_create_topic**:
```rust
pub fn syscall_create_topic<T: ?Sized>(topic: &str)
```
**syscall_publish_to_topic**:
```rust
pub fn syscall_publish_to_topic<T: ?Sized>(
topic: &str,
message: &T,
message_size: usize
)
```
**syscall_subscribe_topic**:
```rust
pub fn syscall_subscribe_topic(topic: &str) -> (*const u8, usize)
```
### Register Layout
**create_topic**:
- x0: topic name pointer
- x1: topic name length
- x8: syscall number (#2)
**publish_to_topic**:
- x0: topic name pointer
- x1: topic name length
- x2: message pointer
- x3: message size
- x8: syscall number (#1)
**subscribe_topic**:
- x0: topic name pointer
- x1: topic name length
- x8: syscall number (#3)
- Returns: x0 = message pointer, x1 = message size
## Implementation Notes
### Topic Name Storage
- Topic names are stored as fixed-size byte arrays (`[u8; TOPIC_NAME_MAX_LENGTH]`)
- Maximum length: 255 bytes
- Names are not null-terminated (Rust strings don't require null termination)
- UTF-8 encoding is validated when converting to string
### Message Size
- Message size is specified during topic creation (`TOPIC_NAME_MAX_LENGTH` by default)
- Shared memory is allocated with this size
- Messages larger than the allocated size will fail
- Messages smaller than the allocated size are allowed
### Thread Registration
- Publisher and subscriber threads are automatically registered on first use
- Thread references are stored in `publishers` and `subscribers` vectors
- Registration allows automatic wake-up when data becomes available
- Threads remain registered until process termination
### Single Message Buffer
- Only one message can be stored in a topic at a time
- This design simplifies memory management
- Publishers block if previous message not consumed
- Subscribers block if no message available
- Ensures no message loss or overwrites
### Single-Copy Design
- Messages are copied once from publisher's buffer to shared memory
- The kernel performs a `memcpy` operation during `SafeSharedMemory::write()`
- Physical memory is shared across processes (one physical page)
- Each process has its own virtual address mapping to the same physical page
- Subscribers read directly from shared memory (no additional copy)
- Reduces memory overhead compared to multiple copies, but not true zero-copy
## Future Improvements
### Potential Enhancements
1. **Message Queue**: Support multiple messages per topic
2. **Topic Removal**: Allow topics to be removed when no longer needed
3. **Message Filtering**: Support message filtering for subscribers
4. **Topic Discovery**: Provide mechanism to list all available topics
5. **Message Persistence**: Option to persist messages across process restarts
6. **Priority Support**: Support message priority for subscribers
7. **Topic Permissions**: Access control for topic creation/access
### Performance Considerations
- Current implementation uses linear search for topic lookup (O(n))
- Could be improved with hash table for O(1) lookup
- Single message buffer limits throughput
- Thread wake-up could be optimized for many subscribers