Skip to main content

repartir/
lib.rs

1//! # Repartir: Sovereign AI-Grade Distributed Computing
2//!
3//! Repartir is a pure Rust library for distributed execution across CPUs, GPUs,
4//! and remote machines. Built on the **Iron Lotus Framework** and validated by
5//! the **certeza** testing methodology.
6//!
7//! ## Features
8//!
9//! - **100% Rust, Zero C/C++**: True digital sovereignty through complete auditability
10//! - **Memory Safety Guaranteed**: Provably safe via `RustBelt` formal verification
11//! - **Work-Stealing Scheduler**: Based on Blumofe & Leiserson (1999)
12//! - **Supply Chain Security**: Dependency pinning, binary signing, license enforcement
13//! - **Iron Lotus Quality**: ≥95% coverage, ≥80% mutation score, formal verification
14//!
15//! ## Quick Start
16//!
17//! ```no_run
18//! use repartir::{Pool, task::{Task, Backend}};
19//!
20//! #[tokio::main]
21//! async fn main() -> repartir::error::Result<()> {
22//!     // Create a pool with CPU executor
23//!     let pool = Pool::builder()
24//!         .cpu_workers(8)
25//!         .build()?; // Note: build() is sync, not async
26//!
27//!     // Submit a task
28//!     let task = Task::builder()
29//!         .binary("./worker")
30//!         .arg("--input")
31//!         .arg("data.csv")
32//!         .backend(Backend::Cpu)
33//!         .build()?;
34//!
35//!     let result = pool.submit(task).await?;
36//!
37//!     if result.is_success() {
38//!         println!("Task completed: {}", result.stdout_str()?);
39//!     }
40//!
41//!     Ok(())
42//! }
43//! ```
44//!
45//! ## Architecture
46//!
47//! Repartir follows a clean, layered architecture:
48//!
49//! - **Task**: Unit of work (binary + args + env)
50//! - **Executor**: Runs tasks on a specific backend (CPU, GPU, Remote)
51//! - **Scheduler**: Distributes tasks across executors (work-stealing)
52//! - **Pool**: High-level API for task submission and result retrieval
53//!
54//! ## Toyota Way Principles
55//!
56//! Repartir embodies the **Iron Lotus Framework**:
57//!
58//! - **Genchi Genbutsu (現地現物)**: Transparent execution, traceable to source
59//! - **Jidoka (自働化)**: Automated quality gates (CI enforces ≥95% coverage)
60//! - **Kaizen (改善)**: Continuous improvement (TDG ratchet, mutation testing)
61//! - **Muda (無駄)**: Waste elimination (zero-copy, single language, no FFI)
62//!
63//! ## Certeza Testing
64//!
65//! Three-tiered testing for asymptotic effectiveness:
66//!
67//! - **Tier 1 (ON-SAVE)**: Sub-second feedback (<3s)
68//! - **Tier 2 (ON-COMMIT)**: Comprehensive (1-5min, 95% coverage)
69//! - **Tier 3 (ON-MERGE)**: Exhaustive (hours, 80% mutation score)
70
71#![warn(missing_docs)]
72#![warn(clippy::all)]
73#![warn(clippy::pedantic)]
74#![warn(clippy::nursery)]
75#![deny(unsafe_code)] // v1.0: No unsafe code (Sovereign AI requirement)
76
77pub mod checkpoint;
78pub mod error;
79pub mod executor;
80pub mod messaging;
81pub mod scheduler;
82#[cfg(feature = "serverless")]
83pub mod serverless;
84pub mod task;
85#[cfg(feature = "tensor")]
86pub mod tensor;
87#[cfg(feature = "tui")]
88pub mod tui;
89
90use error::{RepartirError, Result};
91use executor::cpu::CpuExecutor;
92use executor::Executor;
93use scheduler::Scheduler;
94use std::sync::Arc;
95use task::{ExecutionResult, Task};
96use tokio::sync::RwLock;
97use tokio::task::JoinSet;
98use tracing::{debug, info};
99
100/// High-level API for distributed task execution.
101///
102/// The `Pool` manages executors, schedules tasks, and coordinates results.
103///
104/// # Example
105///
106/// ```no_run
107/// use repartir::Pool;
108///
109/// #[tokio::main]
110/// async fn main() -> repartir::error::Result<()> {
111///     let pool = Pool::builder()
112///         .cpu_workers(4)
113///         .build()?;
114///
115///     println!("Pool ready with {} workers", pool.capacity());
116///     Ok(())
117/// }
118/// ```
119pub struct Pool {
120    /// Task scheduler.
121    scheduler: Arc<Scheduler>,
122    /// CPU executor.
123    #[allow(dead_code)] // v1.0: not directly accessed, used by workers
124    cpu_executor: Option<Arc<CpuExecutor>>,
125    /// Number of worker tasks.
126    num_workers: usize,
127    /// Active worker handles.
128    workers: Arc<RwLock<JoinSet<()>>>,
129}
130
131impl Pool {
132    /// Creates a new pool builder.
133    #[must_use]
134    pub fn builder() -> PoolBuilder {
135        PoolBuilder::default()
136    }
137
138    /// Submits a task for execution.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if:
143    /// - The task is invalid
144    /// - The queue is full
145    /// - Execution fails
146    pub async fn submit(&self, task: Task) -> Result<ExecutionResult> {
147        let task_id = self.scheduler.submit(task).await?;
148        debug!("Task {task_id} submitted");
149
150        // Wait for result (polling-based for v1.0, event-driven in v1.1)
151        loop {
152            if let Some(result) = self.scheduler.get_result(task_id).await {
153                self.scheduler.remove_result(task_id).await;
154                return Ok(result);
155            }
156
157            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
158        }
159    }
160
161    /// Returns the total capacity (number of workers).
162    #[must_use]
163    pub const fn capacity(&self) -> usize {
164        self.num_workers
165    }
166
167    /// Returns the number of pending tasks.
168    pub async fn pending_tasks(&self) -> usize {
169        self.scheduler.pending_count().await
170    }
171
172    /// Shuts down the pool gracefully.
173    ///
174    /// Waits for all pending tasks to complete.
175    pub async fn shutdown(self) {
176        info!("Shutting down pool");
177        self.scheduler.clear().await;
178        self.workers.write().await.shutdown().await;
179    }
180}
181
182/// Builder for `Pool`.
183#[derive(Default)]
184pub struct PoolBuilder {
185    cpu_workers: Option<usize>,
186    max_queue_size: Option<usize>,
187}
188
189impl PoolBuilder {
190    /// Sets the number of CPU workers.
191    #[must_use]
192    pub const fn cpu_workers(mut self, count: usize) -> Self {
193        self.cpu_workers = Some(count);
194        self
195    }
196
197    /// Sets the maximum queue size.
198    #[must_use]
199    pub const fn max_queue_size(mut self, size: usize) -> Self {
200        self.max_queue_size = Some(size);
201        self
202    }
203
204    /// Builds the pool and starts workers.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if no workers are configured.
209    ///
210    /// # Panics
211    ///
212    /// Will panic if `cpu_executor` is None (should never happen).
213    pub fn build(self) -> Result<Pool> {
214        let cpu_workers = self.cpu_workers.unwrap_or(0);
215
216        if cpu_workers == 0 {
217            return Err(RepartirError::InvalidTask {
218                reason: "At least one worker type must be configured".to_string(),
219            });
220        }
221
222        let max_queue_size = self.max_queue_size.unwrap_or(10_000);
223        let scheduler = Arc::new(Scheduler::with_capacity(max_queue_size));
224        let cpu_executor = Arc::new(CpuExecutor::new());
225
226        info!("Initializing pool with {cpu_workers} CPU workers");
227
228        // Spawn worker tasks
229        let mut workers = JoinSet::new();
230        for worker_id in 0..cpu_workers {
231            let scheduler = Arc::clone(&scheduler);
232            let executor = Arc::clone(&cpu_executor);
233
234            workers.spawn(async move {
235                debug!("Worker {worker_id} started");
236                loop {
237                    // Poll for tasks
238                    if let Some(task) = scheduler.next_task().await {
239                        match executor.execute(task).await {
240                            Ok(result) => {
241                                scheduler.store_result(result).await;
242                            }
243                            Err(e) => {
244                                debug!("Task execution failed: {e}");
245                            }
246                        }
247                    } else {
248                        // No tasks, sleep briefly
249                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
250                    }
251                }
252            });
253        }
254
255        Ok(Pool {
256            scheduler,
257            cpu_executor: Some(cpu_executor),
258            num_workers: cpu_workers,
259            workers: Arc::new(RwLock::new(workers)),
260        })
261    }
262}
263
264#[cfg(test)]
265#[allow(clippy::unwrap_used, clippy::expect_used)]
266mod tests {
267    use super::*;
268    use task::Backend;
269
270    #[tokio::test]
271    async fn test_pool_builder() {
272        let pool = Pool::builder().cpu_workers(2).max_queue_size(100).build();
273
274        assert!(pool.is_ok());
275        let p = pool.expect("Pool build should succeed");
276        assert_eq!(p.capacity(), 2);
277        assert_eq!(p.pending_tasks().await, 0);
278
279        p.shutdown().await;
280    }
281
282    #[tokio::test]
283    async fn test_pool_submit_task() {
284        let pool = Pool::builder()
285            .cpu_workers(2)
286            .build()
287            .expect("Pool build should succeed");
288
289        #[cfg(unix)]
290        {
291            let task = Task::builder()
292                .binary("/bin/echo")
293                .arg("test")
294                .backend(Backend::Cpu)
295                .build()
296                .expect("Task build should succeed");
297
298            let result = pool.submit(task).await;
299            assert!(result.is_ok());
300
301            let exec_result = result.expect("Task execution should succeed");
302            assert!(exec_result.is_success());
303            assert_eq!(
304                exec_result
305                    .stdout_str()
306                    .expect("stdout should be UTF-8")
307                    .trim(),
308                "test"
309            );
310        }
311
312        pool.shutdown().await;
313    }
314
315    #[tokio::test]
316    async fn test_pool_no_workers_error() {
317        let pool = Pool::builder().build();
318        assert!(pool.is_err());
319    }
320}