repartir/lib.rs
1//! # Repartir: Sovereign AI-Grade Distributed Computing
2//!
3//! Repartir is a pure Rust library for distributed execution across CPUs, GPUs,
4//! and remote machines. Built on the **Iron Lotus Framework** and validated by
5//! the **certeza** testing methodology.
6//!
7//! ## Features
8//!
9//! - **100% Rust, Zero C/C++**: True digital sovereignty through complete auditability
10//! - **Memory Safety Guaranteed**: Provably safe via `RustBelt` formal verification
11//! - **Work-Stealing Scheduler**: Based on Blumofe & Leiserson (1999)
12//! - **Supply Chain Security**: Dependency pinning, binary signing, license enforcement
13//! - **Iron Lotus Quality**: ≥95% coverage, ≥80% mutation score, formal verification
14//!
15//! ## Quick Start
16//!
17//! ```no_run
18//! use repartir::{Pool, task::{Task, Backend}};
19//!
20//! #[tokio::main]
21//! async fn main() -> repartir::error::Result<()> {
22//! // Create a pool with CPU executor
23//! let pool = Pool::builder()
24//! .cpu_workers(8)
25//! .build()?; // Note: build() is sync, not async
26//!
27//! // Submit a task
28//! let task = Task::builder()
29//! .binary("./worker")
30//! .arg("--input")
31//! .arg("data.csv")
32//! .backend(Backend::Cpu)
33//! .build()?;
34//!
35//! let result = pool.submit(task).await?;
36//!
37//! if result.is_success() {
38//! println!("Task completed: {}", result.stdout_str()?);
39//! }
40//!
41//! Ok(())
42//! }
43//! ```
44//!
45//! ## Architecture
46//!
47//! Repartir follows a clean, layered architecture:
48//!
49//! - **Task**: Unit of work (binary + args + env)
50//! - **Executor**: Runs tasks on a specific backend (CPU, GPU, Remote)
51//! - **Scheduler**: Distributes tasks across executors (work-stealing)
52//! - **Pool**: High-level API for task submission and result retrieval
53//!
54//! ## Toyota Way Principles
55//!
56//! Repartir embodies the **Iron Lotus Framework**:
57//!
58//! - **Genchi Genbutsu (現地現物)**: Transparent execution, traceable to source
59//! - **Jidoka (自働化)**: Automated quality gates (CI enforces ≥95% coverage)
60//! - **Kaizen (改善)**: Continuous improvement (TDG ratchet, mutation testing)
61//! - **Muda (無駄)**: Waste elimination (zero-copy, single language, no FFI)
62//!
63//! ## Certeza Testing
64//!
65//! Three-tiered testing for asymptotic effectiveness:
66//!
67//! - **Tier 1 (ON-SAVE)**: Sub-second feedback (<3s)
68//! - **Tier 2 (ON-COMMIT)**: Comprehensive (1-5min, 95% coverage)
69//! - **Tier 3 (ON-MERGE)**: Exhaustive (hours, 80% mutation score)
70
71#![warn(missing_docs)]
72#![warn(clippy::all)]
73#![warn(clippy::pedantic)]
74#![warn(clippy::nursery)]
75#![deny(unsafe_code)] // v1.0: No unsafe code (Sovereign AI requirement)
76
77pub mod checkpoint;
78pub mod error;
79pub mod executor;
80pub mod messaging;
81pub mod scheduler;
82#[cfg(feature = "serverless")]
83pub mod serverless;
84pub mod task;
85#[cfg(feature = "tensor")]
86pub mod tensor;
87#[cfg(feature = "tui")]
88pub mod tui;
89
90use error::{RepartirError, Result};
91use executor::cpu::CpuExecutor;
92use executor::Executor;
93use scheduler::Scheduler;
94use std::sync::Arc;
95use task::{ExecutionResult, Task};
96use tokio::sync::RwLock;
97use tokio::task::JoinSet;
98use tracing::{debug, info};
99
100/// High-level API for distributed task execution.
101///
102/// The `Pool` manages executors, schedules tasks, and coordinates results.
103///
104/// # Example
105///
106/// ```no_run
107/// use repartir::Pool;
108///
109/// #[tokio::main]
110/// async fn main() -> repartir::error::Result<()> {
111/// let pool = Pool::builder()
112/// .cpu_workers(4)
113/// .build()?;
114///
115/// println!("Pool ready with {} workers", pool.capacity());
116/// Ok(())
117/// }
118/// ```
119pub struct Pool {
120 /// Task scheduler.
121 scheduler: Arc<Scheduler>,
122 /// CPU executor.
123 #[allow(dead_code)] // v1.0: not directly accessed, used by workers
124 cpu_executor: Option<Arc<CpuExecutor>>,
125 /// Number of worker tasks.
126 num_workers: usize,
127 /// Active worker handles.
128 workers: Arc<RwLock<JoinSet<()>>>,
129}
130
131impl Pool {
132 /// Creates a new pool builder.
133 #[must_use]
134 pub fn builder() -> PoolBuilder {
135 PoolBuilder::default()
136 }
137
138 /// Submits a task for execution.
139 ///
140 /// # Errors
141 ///
142 /// Returns an error if:
143 /// - The task is invalid
144 /// - The queue is full
145 /// - Execution fails
146 pub async fn submit(&self, task: Task) -> Result<ExecutionResult> {
147 let task_id = self.scheduler.submit(task).await?;
148 debug!("Task {task_id} submitted");
149
150 // Wait for result (polling-based for v1.0, event-driven in v1.1)
151 loop {
152 if let Some(result) = self.scheduler.get_result(task_id).await {
153 self.scheduler.remove_result(task_id).await;
154 return Ok(result);
155 }
156
157 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
158 }
159 }
160
161 /// Returns the total capacity (number of workers).
162 #[must_use]
163 pub const fn capacity(&self) -> usize {
164 self.num_workers
165 }
166
167 /// Returns the number of pending tasks.
168 pub async fn pending_tasks(&self) -> usize {
169 self.scheduler.pending_count().await
170 }
171
172 /// Shuts down the pool gracefully.
173 ///
174 /// Waits for all pending tasks to complete.
175 pub async fn shutdown(self) {
176 info!("Shutting down pool");
177 self.scheduler.clear().await;
178 self.workers.write().await.shutdown().await;
179 }
180}
181
182/// Builder for `Pool`.
183#[derive(Default)]
184pub struct PoolBuilder {
185 cpu_workers: Option<usize>,
186 max_queue_size: Option<usize>,
187}
188
189impl PoolBuilder {
190 /// Sets the number of CPU workers.
191 #[must_use]
192 pub const fn cpu_workers(mut self, count: usize) -> Self {
193 self.cpu_workers = Some(count);
194 self
195 }
196
197 /// Sets the maximum queue size.
198 #[must_use]
199 pub const fn max_queue_size(mut self, size: usize) -> Self {
200 self.max_queue_size = Some(size);
201 self
202 }
203
204 /// Builds the pool and starts workers.
205 ///
206 /// # Errors
207 ///
208 /// Returns an error if no workers are configured.
209 ///
210 /// # Panics
211 ///
212 /// Will panic if `cpu_executor` is None (should never happen).
213 pub fn build(self) -> Result<Pool> {
214 let cpu_workers = self.cpu_workers.unwrap_or(0);
215
216 if cpu_workers == 0 {
217 return Err(RepartirError::InvalidTask {
218 reason: "At least one worker type must be configured".to_string(),
219 });
220 }
221
222 let max_queue_size = self.max_queue_size.unwrap_or(10_000);
223 let scheduler = Arc::new(Scheduler::with_capacity(max_queue_size));
224 let cpu_executor = Arc::new(CpuExecutor::new());
225
226 info!("Initializing pool with {cpu_workers} CPU workers");
227
228 // Spawn worker tasks
229 let mut workers = JoinSet::new();
230 for worker_id in 0..cpu_workers {
231 let scheduler = Arc::clone(&scheduler);
232 let executor = Arc::clone(&cpu_executor);
233
234 workers.spawn(async move {
235 debug!("Worker {worker_id} started");
236 loop {
237 // Poll for tasks
238 if let Some(task) = scheduler.next_task().await {
239 match executor.execute(task).await {
240 Ok(result) => {
241 scheduler.store_result(result).await;
242 }
243 Err(e) => {
244 debug!("Task execution failed: {e}");
245 }
246 }
247 } else {
248 // No tasks, sleep briefly
249 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
250 }
251 }
252 });
253 }
254
255 Ok(Pool {
256 scheduler,
257 cpu_executor: Some(cpu_executor),
258 num_workers: cpu_workers,
259 workers: Arc::new(RwLock::new(workers)),
260 })
261 }
262}
263
264#[cfg(test)]
265#[allow(clippy::unwrap_used, clippy::expect_used)]
266mod tests {
267 use super::*;
268 use task::Backend;
269
270 #[tokio::test]
271 async fn test_pool_builder() {
272 let pool = Pool::builder().cpu_workers(2).max_queue_size(100).build();
273
274 assert!(pool.is_ok());
275 let p = pool.expect("Pool build should succeed");
276 assert_eq!(p.capacity(), 2);
277 assert_eq!(p.pending_tasks().await, 0);
278
279 p.shutdown().await;
280 }
281
282 #[tokio::test]
283 async fn test_pool_submit_task() {
284 let pool = Pool::builder()
285 .cpu_workers(2)
286 .build()
287 .expect("Pool build should succeed");
288
289 #[cfg(unix)]
290 {
291 let task = Task::builder()
292 .binary("/bin/echo")
293 .arg("test")
294 .backend(Backend::Cpu)
295 .build()
296 .expect("Task build should succeed");
297
298 let result = pool.submit(task).await;
299 assert!(result.is_ok());
300
301 let exec_result = result.expect("Task execution should succeed");
302 assert!(exec_result.is_success());
303 assert_eq!(
304 exec_result
305 .stdout_str()
306 .expect("stdout should be UTF-8")
307 .trim(),
308 "test"
309 );
310 }
311
312 pool.shutdown().await;
313 }
314
315 #[tokio::test]
316 async fn test_pool_no_workers_error() {
317 let pool = Pool::builder().build();
318 assert!(pool.is_err());
319 }
320}