scirs2_core/array_protocol/
distributed_training.rs

1// Copyright (c) 2025, `SciRS2` Team
2//
3// Licensed under either of
4//
5// * Apache License, Version 2.0
6//   (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
7// * MIT license
8//   (LICENSE-MIT or http://opensource.org/licenses/MIT)
9//
10// at your option.
11//
12
13//! Distributed training support for the array protocol.
14//!
15//! This module provides utilities for distributed training of neural networks
16//! using the array protocol. It includes data-parallel and model-parallel
17//! training strategies, parameter synchronization, and distributed optimization.
18
19use std::fmt;
20use std::sync::Arc;
21
22use crate::array_protocol::neural::Sequential;
23use crate::array_protocol::training::{DataLoader, Dataset, Metrics, Trainer, TrainingCallback};
24use crate::array_protocol::ArrayProtocol;
25use crate::error::{CoreError, CoreResult, ErrorContext};
26
27/// Distributed training strategy.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum DistributedStrategy {
30    /// Data parallelism - same model on each worker, different data.
31    DataParallel,
32
33    /// Model parallelism - different parts of the model on each worker.
34    ModelParallel,
35
36    /// Hybrid parallelism - combination of data and model parallelism.
37    HybridParallel,
38
39    /// Pipeline parallelism - model stages executed in a pipeline.
40    PipelineParallel,
41}
42
43impl fmt::Display for DistributedStrategy {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        match self {
46            Self::DataParallel => write!(f, "DataParallel"),
47            Self::ModelParallel => write!(f, "ModelParallel"),
48            Self::HybridParallel => write!(f, "HybridParallel"),
49            Self::PipelineParallel => write!(f, "PipelineParallel"),
50        }
51    }
52}
53
54/// Configuration for distributed training.
55#[derive(Debug, Clone)]
56pub struct DistributedTrainingConfig {
57    /// Distributed training strategy.
58    pub strategy: DistributedStrategy,
59
60    /// Number of workers.
61    pub numworkers: usize,
62
63    /// Rank of the current worker.
64    pub rank: usize,
65
66    /// Whether this worker is the master.
67    pub is_master: bool,
68
69    /// Synchronization interval (in batches).
70    pub syncinterval: usize,
71
72    /// Communication backend.
73    pub backend: String,
74
75    /// Whether to use mixed precision training.
76    pub mixed_precision: bool,
77
78    /// Gradient accumulation steps.
79    pub gradient_accumulation_steps: usize,
80}
81
82impl Default for DistributedTrainingConfig {
83    fn default() -> Self {
84        Self {
85            strategy: DistributedStrategy::DataParallel,
86            numworkers: 1,
87            rank: 0,
88            is_master: true,
89            syncinterval: 1,
90            backend: "threaded".to_string(),
91            mixed_precision: false,
92            gradient_accumulation_steps: 1,
93        }
94    }
95}
96
97/// A node in a distributed training cluster.
98#[allow(dead_code)]
99pub struct DistributedNode {
100    /// Configuration for the node.
101    config: DistributedTrainingConfig,
102
103    /// The model being trained.
104    model: Sequential,
105
106    /// Communication channel to other nodes (kept private to avoid warning).
107    channel: CommunicationChannel,
108}
109
110impl DistributedNode {
111    /// Create a new distributed node.
112    pub fn new(
113        model: Sequential,
114        config: DistributedTrainingConfig,
115        channel: Box<dyn DistributedCommunication>,
116    ) -> Self {
117        Self {
118            config,
119            model,
120            channel: CommunicationChannel::new(channel),
121        }
122    }
123
124    /// Synchronize model parameters with other nodes.
125    pub fn synchronize_parameters(&mut self) -> CoreResult<()> {
126        match self.config.strategy {
127            DistributedStrategy::DataParallel => {
128                // In data parallelism, we average the gradients across workers
129                self.average_gradients()?;
130            }
131            DistributedStrategy::ModelParallel => {
132                // In model parallelism, we exchange activations and gradients
133                // between adjacent layers
134                self.exchange_activations_and_gradients()?;
135            }
136            DistributedStrategy::HybridParallel => {
137                // In hybrid parallelism, we do a combination of both
138                self.average_gradients()?;
139                self.exchange_activations_and_gradients()?;
140            }
141            DistributedStrategy::PipelineParallel => {
142                // In pipeline parallelism, we maintain a pipeline of batches
143                self.pipeline_forward_backward()?;
144            }
145        }
146
147        Ok(())
148    }
149
150    /// Average gradients across workers.
151    fn average_gradients(&self) -> CoreResult<()> {
152        // This is a simplified implementation for demonstration purposes.
153        // In a real implementation, this would use the DistributedCommunication
154        // channel to exchange gradients with other workers.
155
156        // 1. Get model parameters
157        let params = self.model.parameters();
158
159        // 2. For each parameter, send gradient to other workers and receive their gradients
160        for _param in params {
161            // Example: In a real implementation, we would do something like:
162            // let gradient = param.grad()?;
163            // let averaged_gradient = self.channel.all_reduce(gradient, "mean")?;
164            // param.set_grad(averaged_gradient)?;
165        }
166
167        Ok(())
168    }
169
170    /// Exchange activations and gradients between adjacent layers.
171    fn exchange_activations_and_gradients(&self) -> CoreResult<()> {
172        // This is a simplified implementation for demonstration purposes.
173        // In a real implementation, this would use the DistributedCommunication
174        // channel to exchange activations and gradients with adjacent workers.
175
176        // For model parallelism, each worker has a subset of the model's layers.
177        // During forward pass:
178        //   - Worker i computes activations for its layers
179        //   - Worker i sends activations to worker i+1
180        //   - Worker i+1 receives activations from worker i
181        //
182        // During backward pass:
183        //   - Worker i+1 computes gradients for its layers
184        //   - Worker i+1 sends gradients to worker i
185        //   - Worker i receives gradients from worker i+1
186
187        Ok(())
188    }
189
190    /// Implement pipeline parallelism.
191    fn pipeline_forward_backward(&self) -> CoreResult<()> {
192        // This is a simplified implementation for demonstration purposes.
193        // In a real implementation, this would maintain a pipeline of mini-batches.
194
195        // In pipeline parallelism:
196        // - The model is divided into stages, with each stage on a different worker
197        // - Multiple mini-batches are processed concurrently
198        // - When worker i finishes processing a mini-batch, it sends the activations
199        //   to worker i+1 and starts processing the next mini-batch
200        // - This creates a pipeline where different workers are processing different
201        //   mini-batches at the same time
202
203        Ok(())
204    }
205}
206
207/// Trait for distributed communication between nodes.
208pub trait DistributedCommunication: Send + Sync {
209    /// Send a tensor to another worker.
210    fn send(&self, tensor: Box<dyn ArrayProtocol>, destination: usize) -> CoreResult<()>;
211
212    /// Receive a tensor from another worker.
213    fn recv(&self, source: usize) -> CoreResult<Box<dyn ArrayProtocol>>;
214
215    /// Broadcast a tensor from the master to all workers.
216    fn broadcast(&self, tensor: Box<dyn ArrayProtocol>) -> CoreResult<Box<dyn ArrayProtocol>>;
217
218    /// Gather tensors from all workers to the master.
219    fn gather(&self, tensor: Box<dyn ArrayProtocol>) -> CoreResult<Vec<Box<dyn ArrayProtocol>>>;
220
221    /// Scatter tensors from the master to all workers.
222    fn scatter(&self, tensors: Vec<Box<dyn ArrayProtocol>>) -> CoreResult<Box<dyn ArrayProtocol>>;
223
224    /// Reduce tensors from all workers to the master.
225    fn reduce(
226        &self,
227        tensor: Box<dyn ArrayProtocol>,
228        op: &str,
229    ) -> CoreResult<Box<dyn ArrayProtocol>>;
230
231    /// All-reduce tensors across all workers.
232    fn all_reduce(
233        &self,
234        tensor: Box<dyn ArrayProtocol>,
235        op: &str,
236    ) -> CoreResult<Box<dyn ArrayProtocol>>;
237
238    /// All-gather tensors from all workers to all workers.
239    fn all_gather(&self, tensor: Box<dyn ArrayProtocol>)
240        -> CoreResult<Vec<Box<dyn ArrayProtocol>>>;
241
242    /// Barrier synchronization.
243    fn barrier(&self) -> CoreResult<()>;
244
245    /// Clone this communication channel.
246    fn box_clone(&self) -> Box<dyn DistributedCommunication>;
247}
248
249/// A wrapper type that makes `Box<dyn DistributedCommunication>` cloneable
250#[derive(Clone)]
251pub struct CommunicationChannel(Arc<Box<dyn DistributedCommunication>>);
252
253impl CommunicationChannel {
254    /// Create a new communication channel from a communication implementation.
255    pub fn new(comm: Box<dyn DistributedCommunication>) -> Self {
256        Self(Arc::new(comm))
257    }
258
259    /// Get the underlying communication implementation.
260    pub fn inner(&self) -> &dyn DistributedCommunication {
261        self.0.as_ref().as_ref()
262    }
263}
264
265/// Make the `Box<dyn DistributedCommunication>` cloneable via box_clone
266impl Clone for Box<dyn DistributedCommunication> {
267    fn clone(&self) -> Self {
268        self.box_clone()
269    }
270}
271
272/// A mock implementation of distributed communication for testing.
273pub struct MockDistributedCommunication {
274    /// Number of workers.
275    numworkers: usize,
276
277    /// Rank of the current worker.
278    rank: usize,
279}
280
281impl MockDistributedCommunication {
282    /// Create a new mock distributed communication channel.
283    pub fn new(numworkers: usize, rank: usize) -> Self {
284        Self { numworkers, rank }
285    }
286}
287
288impl DistributedCommunication for MockDistributedCommunication {
289    fn send(&self, _tensor: Box<dyn ArrayProtocol>, destination: usize) -> CoreResult<()> {
290        // In a real implementation, this would send the _tensor to the _destination worker
291        Ok(())
292    }
293
294    fn recv(&self, source: usize) -> CoreResult<Box<dyn ArrayProtocol>> {
295        // In a real implementation, this would receive a tensor from the _source worker
296        Err(CoreError::NotImplementedError(ErrorContext::new(
297            "recv not implemented for MockDistributedCommunication".to_string(),
298        )))
299    }
300
301    fn broadcast(&self, tensor: Box<dyn ArrayProtocol>) -> CoreResult<Box<dyn ArrayProtocol>> {
302        // In a real implementation, this would broadcast the tensor to all workers
303        Ok(tensor)
304    }
305
306    fn gather(&self, tensor: Box<dyn ArrayProtocol>) -> CoreResult<Vec<Box<dyn ArrayProtocol>>> {
307        // In a real implementation, this would gather tensors from all workers
308        Ok(vec![tensor])
309    }
310
311    fn scatter(&self, tensors: Vec<Box<dyn ArrayProtocol>>) -> CoreResult<Box<dyn ArrayProtocol>> {
312        // In a real implementation, this would scatter tensors to all workers
313        if tensors.is_empty() {
314            return Err(CoreError::InvalidArgument(ErrorContext::new(
315                "Empty tensors list for scatter".to_string(),
316            )));
317        }
318
319        Ok(tensors[0].clone())
320    }
321
322    fn reduce(
323        &self,
324        tensor: Box<dyn ArrayProtocol>,
325        op: &str,
326    ) -> CoreResult<Box<dyn ArrayProtocol>> {
327        // In a real implementation, this would reduce tensors across all workers
328        match op {
329            "sum" | "mean" => Ok(tensor),
330            _ => Err(CoreError::InvalidArgument(ErrorContext::new(format!(
331                "Unknown reduction operation: {op}"
332            )))),
333        }
334    }
335
336    fn all_reduce(
337        &self,
338        tensor: Box<dyn ArrayProtocol>,
339        op: &str,
340    ) -> CoreResult<Box<dyn ArrayProtocol>> {
341        // In a real implementation, this would all-reduce tensors across all workers
342        match op {
343            "sum" | "mean" => Ok(tensor),
344            _ => Err(CoreError::InvalidArgument(ErrorContext::new(format!(
345                "Unknown reduction operation: {op}"
346            )))),
347        }
348    }
349
350    fn all_gather(
351        &self,
352        tensor: Box<dyn ArrayProtocol>,
353    ) -> CoreResult<Vec<Box<dyn ArrayProtocol>>> {
354        // In a real implementation, this would all-gather tensors from all workers
355        Ok(vec![tensor])
356    }
357
358    fn barrier(&self) -> CoreResult<()> {
359        // In a real implementation, this would synchronize all workers
360        Ok(())
361    }
362
363    fn box_clone(&self) -> Box<dyn DistributedCommunication> {
364        Box::new(MockDistributedCommunication {
365            numworkers: self.numworkers,
366            rank: self.rank,
367        })
368    }
369}
370
371/// Distributed Dataset that partitions data across workers.
372#[allow(dead_code)]
373pub struct DistributedDataset {
374    /// The underlying dataset.
375    dataset: Box<dyn Dataset>,
376
377    /// Number of workers (kept private to avoid warning).
378    numworkers: usize,
379
380    /// Rank of the current worker (kept private to avoid warning).
381    rank: usize,
382
383    /// Indices of samples assigned to this worker.
384    indices: Vec<usize>,
385}
386
387impl DistributedDataset {
388    /// Create a new distributed dataset.
389    pub fn new(dataset: Box<dyn Dataset>, numworkers: usize, rank: usize) -> Self {
390        let num_samples = dataset.len();
391        let samples_per_worker = num_samples / numworkers;
392        let remainder = num_samples % numworkers;
393
394        let start = if rank < remainder {
395            rank * (samples_per_worker + 1)
396        } else {
397            rank * samples_per_worker + remainder
398        };
399
400        let end = if rank < remainder {
401            start + samples_per_worker + 1
402        } else {
403            start + samples_per_worker
404        };
405
406        let indices = (start..end).collect();
407
408        Self {
409            dataset,
410            numworkers,
411            rank,
412            indices,
413        }
414    }
415}
416
417impl Dataset for DistributedDataset {
418    fn len(&self) -> usize {
419        self.indices.len()
420    }
421
422    fn get(&self, index: usize) -> Option<(Box<dyn ArrayProtocol>, Box<dyn ArrayProtocol>)> {
423        if index >= self.len() {
424            return None;
425        }
426
427        let global_index = self.indices[index];
428        self.dataset.get(global_index)
429    }
430
431    fn inputshape(&self) -> Vec<usize> {
432        self.dataset.inputshape()
433    }
434
435    fn outputshape(&self) -> Vec<usize> {
436        self.dataset.outputshape()
437    }
438}
439
440/// Distributed Trainer for handling distributed training.
441#[allow(dead_code)]
442pub struct DistributedTrainer {
443    /// The underlying trainer.
444    trainer: Trainer,
445
446    /// Configuration for distributed training.
447    config: DistributedTrainingConfig,
448
449    /// Communication channel to other nodes.
450    channel: CommunicationChannel,
451
452    /// Batch counter for synchronization (kept private to avoid warning).
453    batch_counter: usize,
454}
455
456impl DistributedTrainer {
457    /// Create a new distributed trainer.
458    pub fn new(
459        trainer: Trainer,
460        config: DistributedTrainingConfig,
461        channel: Box<dyn DistributedCommunication>,
462    ) -> Self {
463        Self {
464            trainer,
465            config,
466            channel: CommunicationChannel::new(channel),
467            batch_counter: 0,
468        }
469    }
470
471    /// Train the model in a distributed setting.
472    pub fn train(
473        &mut self,
474        train_loader: &mut DataLoader,
475        num_epochs: usize,
476        val_loader: Option<&mut DataLoader>,
477    ) -> CoreResult<()> {
478        // Synchronize initial model parameters
479        self.synchronize_parameters()?;
480
481        // Train the model
482        if self.config.strategy == DistributedStrategy::DataParallel {
483            // For data parallelism, we can use the regular trainer
484            // but with periodic parameter synchronization
485            self.train_data_parallel(train_loader, num_epochs, val_loader)?;
486        } else {
487            // For other strategies, we need custom training loops
488            match self.config.strategy {
489                DistributedStrategy::ModelParallel => {
490                    self.train_model_parallel(train_loader, num_epochs, val_loader)?;
491                }
492                DistributedStrategy::HybridParallel => {
493                    self.train_hybrid_parallel(train_loader, num_epochs, val_loader)?;
494                }
495                DistributedStrategy::PipelineParallel => {
496                    self.train_pipeline_parallel(train_loader, num_epochs, val_loader)?;
497                }
498                _ => unreachable!(),
499            }
500        }
501
502        Ok(())
503    }
504
505    /// Synchronize model parameters with other workers.
506    fn synchronize_parameters(&self) -> CoreResult<()> {
507        // In a real implementation, this would synchronize model parameters
508        // across all workers.
509
510        // If this is the master worker, broadcast parameters to all workers
511        // Otherwise, receive parameters from the master
512
513        // For simplicity, we'll just call barrier to synchronize all workers
514        self.channel.inner().barrier()?;
515
516        Ok(())
517    }
518
519    /// Train the model using data parallelism.
520    fn train_data_parallel(
521        &mut self,
522        train_loader: &mut DataLoader,
523        num_epochs: usize,
524        val_loader: Option<&mut DataLoader>,
525    ) -> CoreResult<()> {
526        // Create a callback for parameter synchronization
527        let _sync_callback = ParameterSyncCallback::new(
528            self.config.syncinterval,
529            self.channel.0.clone().box_clone(),
530        );
531
532        // Add the callback to the trainer
533        // self.trainer.add_callback(Box::new(sync_callback));
534
535        // Train the model using the regular trainer
536        self.trainer.train(train_loader, num_epochs, val_loader)?;
537
538        Ok(())
539    }
540
541    /// Train the model using model parallelism.
542    fn train_model_parallel(
543        &mut self,
544        _train_loader: &mut DataLoader,
545        _num_epochs: usize,
546        _val_loader: Option<&mut DataLoader>,
547    ) -> CoreResult<()> {
548        // This is a simplified implementation for demonstration purposes.
549        // In a real implementation, this would implement a custom training loop
550        // that exchanges activations and gradients between workers.
551
552        Ok(())
553    }
554
555    /// Train the model using hybrid parallelism.
556    fn train_hybrid_parallel(
557        &mut self,
558        _train_loader: &mut DataLoader,
559        _num_epochs: usize,
560        _val_loader: Option<&mut DataLoader>,
561    ) -> CoreResult<()> {
562        // This is a simplified implementation for demonstration purposes.
563        // In a real implementation, this would implement a custom training loop
564        // that combines data and model parallelism.
565
566        Ok(())
567    }
568
569    /// Train the model using pipeline parallelism.
570    fn train_pipeline_parallel(
571        &mut self,
572        _train_loader: &mut DataLoader,
573        _num_epochs: usize,
574        _val_loader: Option<&mut DataLoader>,
575    ) -> CoreResult<()> {
576        // This is a simplified implementation for demonstration purposes.
577        // In a real implementation, this would implement a custom training loop
578        // that uses pipeline parallelism.
579
580        Ok(())
581    }
582}
583
584/// Callback for synchronizing parameters between workers.
585pub struct ParameterSyncCallback {
586    /// Synchronization interval (in batches).
587    syncinterval: usize,
588
589    /// Batch counter.
590    batch_counter: usize,
591
592    /// Communication channel to other workers.
593    channel: CommunicationChannel,
594}
595
596impl ParameterSyncCallback {
597    /// Create a new parameter synchronization callback.
598    pub fn new(syncinterval: usize, channel: Box<dyn DistributedCommunication>) -> Self {
599        Self {
600            syncinterval,
601            batch_counter: 0,
602            channel: CommunicationChannel::new(channel),
603        }
604    }
605}
606
607impl TrainingCallback for ParameterSyncCallback {
608    fn on_epoch_start(&mut self, _epoch: usize, _numepochs: usize) {
609        // Reset batch counter at the start of each _epoch
610        self.batch_counter = 0;
611    }
612
613    fn on_epoch_end(&mut self, _epoch: usize, _num_epochs: usize, metrics: &Metrics) {
614        // Synchronize parameters at the end of each _epoch
615        // This is a simplified implementation for demonstration purposes.
616        // In a real implementation, this would call channel.all_reduce() for each parameter.
617
618        match self.channel.inner().barrier() {
619            Ok(()) => {}
620            Err(e) => eprintln!("Error in barrier synchronization: {e}"),
621        }
622    }
623
624    fn on_batch_start(&mut self, _batch: usize, _numbatches: usize) {
625        // No-op for this callback
626    }
627
628    fn on_batch_end(&mut self, _batch: usize, _numbatches: usize, loss: f64) {
629        // Increment _batch counter
630        self.batch_counter += 1;
631
632        // Synchronize parameters if needed
633        if self.batch_counter.is_multiple_of(self.syncinterval) {
634            // This is a simplified implementation for demonstration purposes.
635            // In a real implementation, this would call channel.all_reduce() for each parameter.
636
637            match self.channel.inner().barrier() {
638                Ok(()) => {}
639                Err(e) => eprintln!("Error in barrier synchronization: {e}"),
640            }
641        }
642    }
643
644    fn on_train_start(&mut self, _numepochs: usize) {
645        // Synchronize initial parameters
646        match self.channel.inner().barrier() {
647            Ok(()) => {}
648            Err(e) => eprintln!("Error in barrier synchronization: {e}"),
649        }
650    }
651
652    fn on_train_end(&mut self, metrics: &Metrics) {
653        // Final synchronization
654        match self.channel.inner().barrier() {
655            Ok(()) => {}
656            Err(e) => eprintln!("Error in barrier synchronization: {e}"),
657        }
658    }
659}
660
661/// Factory for creating distributed training components.
662pub struct DistributedTrainingFactory;
663
664impl DistributedTrainingFactory {
665    /// Create a new distributed dataset.
666    pub fn create_dataset(
667        dataset: Box<dyn Dataset>,
668        config: &DistributedTrainingConfig,
669    ) -> Box<dyn Dataset> {
670        Box::new(DistributedDataset::new(
671            dataset,
672            config.numworkers,
673            config.rank,
674        ))
675    }
676
677    /// Create a new distributed trainer.
678    pub fn create_trainer(
679        trainer: Trainer,
680        config: DistributedTrainingConfig,
681    ) -> DistributedTrainer {
682        // Create communication channel
683        let channel: Box<dyn DistributedCommunication> = match config.backend.as_str() {
684            "threaded" => Box::new(MockDistributedCommunication::new(
685                config.numworkers,
686                config.rank,
687            )),
688            // Other backends would be added here
689            _ => Box::new(MockDistributedCommunication::new(
690                config.numworkers,
691                config.rank,
692            )),
693        };
694
695        DistributedTrainer::new(trainer, config, channel)
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702    use crate::array_protocol::training::InMemoryDataset;
703    use crate::array_protocol::NdarrayWrapper;
704    use ndarray::Array2;
705
706    #[test]
707    fn test_distributed_dataset() {
708        // Create a dataset
709        let inputs = Array2::<f64>::ones((10, 5));
710        let targets = Array2::<f64>::zeros((10, 2));
711        let dataset = Box::new(InMemoryDataset::from_arrays(inputs, targets));
712
713        // Create a distributed dataset
714        let dist_dataset = DistributedDataset::new(dataset, 2, 0);
715
716        // Check properties
717        assert_eq!(dist_dataset.len(), 5);
718        assert_eq!(dist_dataset.inputshape(), vec![5]);
719        assert_eq!(dist_dataset.outputshape(), vec![2]);
720
721        // Get a sample
722        let (input, target) = dist_dataset.get(0).unwrap();
723        assert!(input
724            .as_any()
725            .downcast_ref::<NdarrayWrapper<f64, ndarray::IxDyn>>()
726            .is_some());
727        assert!(target
728            .as_any()
729            .downcast_ref::<NdarrayWrapper<f64, ndarray::IxDyn>>()
730            .is_some());
731    }
732
733    #[test]
734    fn test_mock_distributed_communication() {
735        // Create a mock distributed communication channel
736        let channel = MockDistributedCommunication::new(2, 0);
737
738        // Create a tensor
739        let tensor = NdarrayWrapper::new(Array2::<f64>::ones((2, 2)));
740        let boxed_tensor = Box::new(tensor);
741
742        // Test broadcast
743        let result = channel.broadcast(boxed_tensor.clone());
744        assert!(result.is_ok());
745
746        // Test all_reduce
747        let result = channel.all_reduce(boxed_tensor.clone(), "mean");
748        assert!(result.is_ok());
749
750        // Test barrier
751        let result = channel.barrier();
752        assert!(result.is_ok());
753    }
754}